Don't stuck on active jobs

This commit is contained in:
Chocobozzz 2018-02-12 11:25:09 +01:00
parent e78720386f
commit 3df456380a
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
2 changed files with 29 additions and 2 deletions

View File

@ -32,7 +32,7 @@ class JobQueue {
private constructor () {} private constructor () {}
init () { async init () {
// Already initialized // Already initialized
if (this.initialized === true) return if (this.initialized === true) return
this.initialized = true this.initialized = true
@ -54,6 +54,8 @@ class JobQueue {
}) })
this.jobQueue.watchStuckJobs(5000) this.jobQueue.watchStuckJobs(5000)
await this.reactiveStuckJobs()
for (const handlerName of Object.keys(handlers)) { for (const handlerName of Object.keys(handlers)) {
this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
try { try {
@ -117,6 +119,31 @@ class JobQueue {
}) })
} }
private reactiveStuckJobs () {
const promises: Promise<any>[] = []
this.jobQueue.active((err, ids) => {
if (err) throw err
for (const id of ids) {
kue.Job.get(id, (err, job) => {
if (err) throw err
const p = new Promise((res, rej) => {
job.inactive(err => {
if (err) return rej(err)
return res()
})
})
promises.push(p)
})
}
})
return Promise.all(promises)
}
static get Instance () { static get Instance () {
return this.instance || (this.instance = new this()) return this.instance || (this.instance = new this())
} }

View File

@ -347,7 +347,7 @@ function goodbye () {
} }
async function isTherePendingRequests (servers: ServerInfo[]) { async function isTherePendingRequests (servers: ServerInfo[]) {
const states: JobState[] = [ 'inactive', 'active' ] const states: JobState[] = [ 'inactive', 'active', 'delayed' ]
const tasks: Promise<any>[] = [] const tasks: Promise<any>[] = []
let pendingRequests = false let pendingRequests = false