From 74c38902ecec507675e1e9033500addc87e4b7b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Tue, 29 Jan 2019 15:19:54 +0100 Subject: [PATCH] p2p/protocols: fix possible metrics loss in AccountingMetrics (#18956) --- p2p/protocols/reporter.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/p2p/protocols/reporter.go b/p2p/protocols/reporter.go index 215d4fe31b..9612b4a4d5 100644 --- a/p2p/protocols/reporter.go +++ b/p2p/protocols/reporter.go @@ -36,6 +36,13 @@ type AccountingMetrics struct { //for a graceful cleanup func (am *AccountingMetrics) Close() { close(am.reporter.quit) + // wait for reporter loop to finish saving metrics + // before reporter database is closed + select { + case <-time.After(10 * time.Second): + log.Error("accounting metrics reporter timeout") + case <-am.reporter.done: + } am.reporter.db.Close() } @@ -46,6 +53,7 @@ type reporter struct { interval time.Duration //duration at which the reporter will persist metrics db *leveldb.DB //the actual DB quit chan struct{} //quit the reporter loop + done chan struct{} //signal that reporter loop is done } //NewMetricsDB creates a new LevelDB instance used to persist metrics defined @@ -92,6 +100,7 @@ func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *Acc interval: d, db: db, quit: make(chan struct{}), + done: make(chan struct{}), } //run the go routine @@ -106,6 +115,9 @@ func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *Acc //run is the goroutine which periodically sends the metrics to the configured LevelDB func (r *reporter) run() { + // signal that the reporter loop is done + defer close(r.done) + intervalTicker := time.NewTicker(r.interval) for { @@ -121,6 +133,9 @@ func (r *reporter) run() { } case <-r.quit: //graceful shutdown + if err := r.save(); err != nil { + log.Error("unable to send metrics to LevelDB", "err", err) + } return } }