fix reactor engine main loop blocked to wait if drained

This commit is contained in:
zelig 2014-07-05 19:56:01 +01:00
parent d4300c406c
commit 5a2afc5754
1 changed files with 14 additions and 8 deletions

View File

@ -28,7 +28,7 @@ func (e *EventHandler) Post(event Event) {
select { select {
case ch <- event: case ch <- event:
default: default:
logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name) logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
} }
} }
} }
@ -69,7 +69,7 @@ type ReactorEngine struct {
quit chan bool quit chan bool
shutdownChannel chan bool shutdownChannel chan bool
running bool running bool
drained bool drained chan bool
} }
func New() *ReactorEngine { func New() *ReactorEngine {
@ -77,6 +77,7 @@ func New() *ReactorEngine {
eventHandlers: make(map[string]*EventHandler), eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event), eventChannel: make(chan Event),
quit: make(chan bool, 1), quit: make(chan bool, 1),
drained: make(chan bool, 1),
shutdownChannel: make(chan bool, 1), shutdownChannel: make(chan bool, 1),
} }
} }
@ -94,8 +95,9 @@ func (reactor *ReactorEngine) Start() {
case event := <-reactor.eventChannel: case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events // needs to be called syncronously to keep order of events
reactor.dispatch(event) reactor.dispatch(event)
case reactor.drained <- true:
default: default:
reactor.drained = true reactor.drained <- true // blocking till message is coming in
} }
} }
reactor.lock.Lock() reactor.lock.Lock()
@ -113,14 +115,16 @@ func (reactor *ReactorEngine) Stop() {
reactor.lock.RLock() reactor.lock.RLock()
if reactor.running { if reactor.running {
reactor.quit <- true reactor.quit <- true
select {
case <-reactor.drained:
}
} }
reactor.lock.RUnlock() reactor.lock.RUnlock()
<-reactor.shutdownChannel <-reactor.shutdownChannel
} }
func (reactor *ReactorEngine) Flush() { func (reactor *ReactorEngine) Flush() {
for !reactor.drained { <-reactor.drained
}
} }
// Subscribe a channel to the specified event // Subscribe a channel to the specified event
@ -136,7 +140,7 @@ func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
} }
// Add the events channel to reactor event handler // Add the events channel to reactor event handler
eventHandler.Add(eventChannel) eventHandler.Add(eventChannel)
logger.Debugln("added new subscription to %s", event) logger.Debugf("added new subscription to %s", event)
} }
func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
@ -149,7 +153,7 @@ func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event)
if len == 0 { if len == 0 {
reactor.eventHandlers[event] = nil reactor.eventHandlers[event] = nil
} }
logger.Debugln("removed subscription to %s", event) logger.Debugf("removed subscription to %s", event)
} }
} }
@ -158,8 +162,10 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) {
defer reactor.lock.Unlock() defer reactor.lock.Unlock()
if reactor.running { if reactor.running {
reactor.drained = false
reactor.eventChannel <- Event{Resource: resource, Name: event} reactor.eventChannel <- Event{Resource: resource, Name: event}
select {
case <-reactor.drained:
}
} }
} }