ethlog: improve dispatch concurrency

This also fixes a deadlock in the tests.
This commit is contained in:
Felix Lange 2014-10-14 14:29:04 +02:00
parent e183880d8b
commit 4e95cecfb9
1 changed files with 57 additions and 54 deletions

View File

@ -29,20 +29,6 @@ func newPrintfLogMessage(level LogLevel, tag string, format string, v ...interfa
return &logMessage{level, true, fmt.Sprintf("[%s] %s", tag, fmt.Sprintf(format, v...))} return &logMessage{level, true, fmt.Sprintf("[%s] %s", tag, fmt.Sprintf(format, v...))}
} }
func (msg *logMessage) send(logger LogSystem) {
if msg.format {
logger.Printf(msg.msg)
} else {
logger.Println(msg.msg)
}
}
var logMessages chan (*logMessage)
var logSystems []LogSystem
var quit chan chan error
var drained chan bool
var mutex = sync.Mutex{}
type LogLevel uint8 type LogLevel uint8
const ( const (
@ -54,56 +40,80 @@ const (
DebugDetailLevel DebugDetailLevel
) )
func dispatch(msg *logMessage) { var (
for _, logSystem := range logSystems { mutex sync.RWMutex // protects logSystems
if logSystem.GetLogLevel() >= msg.LogLevel { logSystems []LogSystem
msg.send(logSystem)
} logMessages = make(chan *logMessage)
} drainWaitReq = make(chan chan struct{})
)
func init() {
go dispatchLoop()
} }
// log messages are dispatched to log writers func dispatchLoop() {
func start() { var drainWait []chan struct{}
dispatchDone := make(chan struct{})
pending := 0
for { for {
select { select {
case status := <-quit:
status <- nil
return
case msg := <-logMessages: case msg := <-logMessages:
dispatch(msg) go dispatch(msg, dispatchDone)
default: pending++
drained <- true // this blocks until a message is sent to the queue case waiter := <-drainWaitReq:
if pending == 0 {
close(waiter)
} else {
drainWait = append(drainWait, waiter)
}
case <-dispatchDone:
pending--
if pending == 0 {
for _, c := range drainWait {
close(c)
}
drainWait = nil
}
} }
} }
} }
func dispatch(msg *logMessage, done chan<- struct{}) {
mutex.RLock()
for _, sys := range logSystems {
if sys.GetLogLevel() >= msg.LogLevel {
if msg.format {
sys.Printf(msg.msg)
} else {
sys.Println(msg.msg)
}
}
}
mutex.RUnlock()
done <- struct{}{}
}
// send delivers a message to all installed log
// systems. it doesn't wait for the message to be
// written.
func send(msg *logMessage) { func send(msg *logMessage) {
logMessages <- msg logMessages <- msg
select {
case <-drained:
default:
}
} }
// Reset removes all registered log systems.
func Reset() { func Reset() {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() logSystems = nil
if logSystems != nil { mutex.Unlock()
status := make(chan error)
quit <- status
select {
case <-drained:
default:
}
<-status
}
} }
// waits until log messages are drained (dispatched to log writers) // Flush waits until all current log messages have been dispatched to
// the active log systems.
func Flush() { func Flush() {
if logSystems != nil { waiter := make(chan struct{})
<-drained drainWaitReq <- waiter
} <-waiter
} }
type Logger struct { type Logger struct {
@ -115,16 +125,9 @@ func NewLogger(tag string) *Logger {
} }
func AddLogSystem(logSystem LogSystem) { func AddLogSystem(logSystem LogSystem) {
var mutex = &sync.Mutex{}
mutex.Lock() mutex.Lock()
defer mutex.Unlock()
if logSystems == nil {
logMessages = make(chan *logMessage, 10)
quit = make(chan chan error, 1)
drained = make(chan bool, 1)
go start()
}
logSystems = append(logSystems, logSystem) logSystems = append(logSystems, logSystem)
mutex.Unlock()
} }
func (logger *Logger) sendln(level LogLevel, v ...interface{}) { func (logger *Logger) sendln(level LogLevel, v ...interface{}) {