123 lines
3.3 KiB
Go
123 lines
3.3 KiB
Go
package request
|
|
|
|
import (
|
|
"reflect"
|
|
"testing"
|
|
)
|
|
|
|
func TestEventFilter(t *testing.T) {
|
|
s := NewScheduler()
|
|
module1 := &testModule{name: "module1"}
|
|
module2 := &testModule{name: "module2"}
|
|
s.RegisterModule(module1, "module1")
|
|
s.RegisterModule(module2, "module2")
|
|
s.Start()
|
|
// startup process round without events
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expProcess(t, nil)
|
|
module2.expProcess(t, nil)
|
|
srv := &testServer{}
|
|
// register server; both modules should receive server event
|
|
s.RegisterServer(srv)
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expProcess(t, []Event{
|
|
{Type: EvRegistered, Server: srv},
|
|
})
|
|
module2.expProcess(t, []Event{
|
|
{Type: EvRegistered, Server: srv},
|
|
})
|
|
// let module1 send a request
|
|
srv.canRequest = 1
|
|
module1.sendReq = testRequest
|
|
s.Trigger()
|
|
// in first triggered round module1 sends the request, no events yet
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expProcess(t, nil)
|
|
module2.expProcess(t, nil)
|
|
// server emits EvTimeout; only module1 should receive it
|
|
srv.eventCb(Event{Type: EvTimeout, Data: RequestResponse{ID: 1, Request: testRequest}})
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expProcess(t, []Event{
|
|
{Type: EvTimeout, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}},
|
|
})
|
|
module2.expProcess(t, nil)
|
|
// unregister server; both modules should receive server event
|
|
s.UnregisterServer(srv)
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expProcess(t, []Event{
|
|
// module1 should also receive EvFail on its pending request
|
|
{Type: EvFail, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}},
|
|
{Type: EvUnregistered, Server: srv},
|
|
})
|
|
module2.expProcess(t, []Event{
|
|
{Type: EvUnregistered, Server: srv},
|
|
})
|
|
// response after server unregistered; should be discarded
|
|
srv.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expProcess(t, nil)
|
|
module2.expProcess(t, nil)
|
|
// no more process rounds expected; shut down
|
|
s.testWaitCh <- struct{}{}
|
|
module1.expNoMoreProcess(t)
|
|
module2.expNoMoreProcess(t)
|
|
s.Stop()
|
|
}
|
|
|
|
type testServer struct {
|
|
eventCb func(Event)
|
|
lastID ID
|
|
canRequest int
|
|
}
|
|
|
|
func (s *testServer) subscribe(eventCb func(Event)) {
|
|
s.eventCb = eventCb
|
|
}
|
|
|
|
func (s *testServer) canRequestNow() bool {
|
|
return s.canRequest > 0
|
|
}
|
|
|
|
func (s *testServer) sendRequest(req Request) ID {
|
|
s.canRequest--
|
|
s.lastID++
|
|
return s.lastID
|
|
}
|
|
|
|
func (s *testServer) fail(string) {}
|
|
func (s *testServer) unsubscribe() {}
|
|
|
|
type testModule struct {
|
|
name string
|
|
processed [][]Event
|
|
sendReq Request
|
|
}
|
|
|
|
func (m *testModule) Process(requester Requester, events []Event) {
|
|
m.processed = append(m.processed, events)
|
|
if m.sendReq != nil {
|
|
if cs := requester.CanSendTo(); len(cs) > 0 {
|
|
requester.Send(cs[0], m.sendReq)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *testModule) expProcess(t *testing.T, expEvents []Event) {
|
|
if len(m.processed) == 0 {
|
|
t.Errorf("Missing call to %s.Process", m.name)
|
|
return
|
|
}
|
|
events := m.processed[0]
|
|
m.processed = m.processed[1:]
|
|
if !reflect.DeepEqual(events, expEvents) {
|
|
t.Errorf("Call to %s.Process with wrong events (expected %v, got %v)", m.name, expEvents, events)
|
|
}
|
|
}
|
|
|
|
func (m *testModule) expNoMoreProcess(t *testing.T) {
|
|
for len(m.processed) > 0 {
|
|
t.Errorf("Unexpected call to %s.Process with events %v", m.name, m.processed[0])
|
|
m.processed = m.processed[1:]
|
|
}
|
|
}
|