Move to bullmq

This commit is contained in:
Chocobozzz 2022-08-08 10:42:08 +02:00
parent 5e2afe4290
commit 5a921e7b74
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
30 changed files with 211 additions and 148 deletions

View File

@ -109,7 +109,7 @@
"bencode": "^2.0.2", "bencode": "^2.0.2",
"bittorrent-tracker": "^9.0.0", "bittorrent-tracker": "^9.0.0",
"bluebird": "^3.5.0", "bluebird": "^3.5.0",
"bull": "^4.1.0", "bullmq": "^1.87.0",
"bytes": "^3.0.0", "bytes": "^3.0.0",
"chokidar": "^3.4.2", "chokidar": "^3.4.2",
"commander": "^9.0.0", "commander": "^9.0.0",
@ -183,7 +183,6 @@
"@types/bencode": "^2.0.0", "@types/bencode": "^2.0.0",
"@types/bluebird": "^3.5.33", "@types/bluebird": "^3.5.33",
"@types/body-parser": "^1.16.3", "@types/body-parser": "^1.16.3",
"@types/bull": "^3.15.0",
"@types/bytes": "^3.0.0", "@types/bytes": "^3.0.0",
"@types/chai": "^4.0.4", "@types/chai": "^4.0.4",
"@types/chai-json-schema": "^1.4.3", "@types/chai-json-schema": "^1.4.3",

View File

@ -352,6 +352,7 @@ async function startApplication () {
process.on('exit', () => { process.on('exit', () => {
JobQueue.Instance.terminate() JobQueue.Instance.terminate()
.catch(err => logger.error('Cannot terminate job queue.', { err }))
}) })
process.on('SIGINT', () => process.exit(0)) process.on('SIGINT', () => process.exit(0))

View File

@ -1,3 +1,4 @@
import { Job as BullJob } from 'bullmq'
import express from 'express' import express from 'express'
import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models'
import { isArray } from '../../helpers/custom-validators/misc' import { isArray } from '../../helpers/custom-validators/misc'
@ -25,7 +26,7 @@ jobsRouter.post('/pause',
jobsRouter.post('/resume', jobsRouter.post('/resume',
authenticate, authenticate,
ensureUserHasRight(UserRight.MANAGE_JOBS), ensureUserHasRight(UserRight.MANAGE_JOBS),
asyncMiddleware(resumeJobQueue) resumeJobQueue
) )
jobsRouter.get('/:state?', jobsRouter.get('/:state?',
@ -54,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) {
return res.sendStatus(HttpStatusCode.NO_CONTENT_204) return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
} }
async function resumeJobQueue (req: express.Request, res: express.Response) { function resumeJobQueue (req: express.Request, res: express.Response) {
await JobQueue.Instance.resume() JobQueue.Instance.resume()
return res.sendStatus(HttpStatusCode.NO_CONTENT_204) return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
} }
@ -82,7 +83,7 @@ async function listJobs (req: express.Request, res: express.Response) {
return res.json(result) return res.json(result)
} }
async function formatJob (job: any, state?: JobState): Promise<Job> { async function formatJob (job: BullJob, state?: JobState): Promise<Job> {
const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
? job.stacktrace[0] ? job.stacktrace[0]
: null : null
@ -90,9 +91,9 @@ async function formatJob (job: any, state?: JobState): Promise<Job> {
return { return {
id: job.id, id: job.id,
state: state || await job.getState(), state: state || await job.getState(),
type: job.queue.name as JobType, type: job.queueName as JobType,
data: job.data, data: job.data,
progress: await job.progress(), progress: job.progress as number,
priority: job.opts.priority, priority: job.opts.priority,
error, error,
createdAt: new Date(job.timestamp), createdAt: new Date(job.timestamp),

View File

@ -199,7 +199,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo
const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
await job.finished() await JobQueue.Instance.waitJob(job)
} }
const hls = video.getHLSPlaylist() const hls = video.getHLSPlaylist()
@ -208,7 +208,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo
const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
await job.finished() await JobQueue.Instance.waitJob(job)
} }
// Refresh video since files have changed // Refresh video since files have changed

View File

@ -17,6 +17,7 @@ import {
import { VideoPathManager } from '@server/lib/video-path-manager' import { VideoPathManager } from '@server/lib/video-path-manager'
import { buildNextVideoState } from '@server/lib/video-state' import { buildNextVideoState } from '@server/lib/video-state'
import { openapiOperationDoc } from '@server/middlewares/doc' import { openapiOperationDoc } from '@server/middlewares/doc'
import { VideoSourceModel } from '@server/models/video/video-source'
import { MVideoFile, MVideoFullLight } from '@server/types/models' import { MVideoFile, MVideoFullLight } from '@server/types/models'
import { getLowercaseExtension } from '@shared/core-utils' import { getLowercaseExtension } from '@shared/core-utils'
import { isAudioFile, uuidToShort } from '@shared/extra-utils' import { isAudioFile, uuidToShort } from '@shared/extra-utils'
@ -44,7 +45,6 @@ import {
import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update'
import { VideoModel } from '../../../models/video/video' import { VideoModel } from '../../../models/video/video'
import { VideoFileModel } from '../../../models/video/video-file' import { VideoFileModel } from '../../../models/video/video-file'
import { VideoSourceModel } from '@server/models/video/video-source'
const lTags = loggerTagsFactory('api', 'video') const lTags = loggerTagsFactory('api', 'video')
const auditLogger = auditLoggerFactory('videos') const auditLogger = auditLoggerFactory('videos')
@ -270,7 +270,7 @@ async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoF
const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
await job.finished() await JobQueue.Instance.waitJob(job)
const refreshedVideo = await VideoModel.loadFull(video.id) const refreshedVideo = await VideoModel.loadFull(video.id)
if (!refreshedVideo) return if (!refreshedVideo) return

View File

@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models'
import { exists } from './misc' import { exists } from './misc'
import { jobTypes } from '@server/lib/job-queue/job-queue' import { jobTypes } from '@server/lib/job-queue/job-queue'
const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ] const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused', 'waiting-children' ]
function isValidJobState (value: JobState) { function isValidJobState (value: JobState) {
return exists(value) && jobStates.includes(value) return exists(value) && jobStates.includes(value)

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg' import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg'
import { execPromise } from '@server/helpers/core-utils' import { execPromise } from '@server/helpers/core-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger' import { logger, loggerTagsFactory } from '@server/helpers/logger'
@ -81,7 +81,7 @@ async function runCommand (options: {
command.on('progress', progress => { command.on('progress', progress => {
if (!progress.percent) return if (!progress.percent) return
job.progress(Math.round(progress.percent)) job.updateProgress(Math.round(progress.percent))
.catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() })) .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() }))
}) })
} }

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { FfmpegCommand } from 'fluent-ffmpeg' import { FfmpegCommand } from 'fluent-ffmpeg'
import { readFile, writeFile } from 'fs-extra' import { readFile, writeFile } from 'fs-extra'
import { dirname } from 'path' import { dirname } from 'path'

View File

@ -1,4 +1,4 @@
import { CronRepeatOptions, EveryRepeatOptions } from 'bull' import { RepeatOptions } from 'bullmq'
import { randomBytes } from 'crypto' import { randomBytes } from 'crypto'
import { invert } from 'lodash' import { invert } from 'lodash'
import { join } from 'path' import { join } from 'path'
@ -197,7 +197,7 @@ const JOB_TTL: { [id in JobType]: number } = {
'manage-video-torrent': 1000 * 3600 * 3, // 3 hours 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
} }
const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = { const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
'videos-views-stats': { 'videos-views-stats': {
cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
}, },

View File

@ -1,5 +1,5 @@
import { map } from 'bluebird' import { map } from 'bluebird'
import { Job } from 'bull' import { Job } from 'bullmq'
import { import {
isAnnounceActivityValid, isAnnounceActivityValid,
isDislikeActivityValid, isDislikeActivityValid,

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url'
import { ActivitypubFollowPayload } from '@shared/models' import { ActivitypubFollowPayload } from '@shared/models'
import { sanitizeHost } from '../../../helpers/core-utils' import { sanitizeHost } from '../../../helpers/core-utils'

View File

@ -1,5 +1,5 @@
import { map } from 'bluebird' import { map } from 'bluebird'
import { Job } from 'bull' import { Job } from 'bullmq'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
import { ActivitypubHttpBroadcastPayload } from '@shared/models' import { ActivitypubHttpBroadcastPayload } from '@shared/models'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { VideoModel } from '../../../models/video/video' import { VideoModel } from '../../../models/video/video'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
import { ActivitypubHttpUnicastPayload } from '@shared/models' import { ActivitypubHttpUnicastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists'
import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos'
import { loadVideoByUrl } from '@server/lib/model-loaders' import { loadVideoByUrl } from '@server/lib/model-loaders'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors'
import { ActorModel } from '@server/models/actor/actor' import { ActorModel } from '@server/models/actor/actor'
import { ActorKeysPayload } from '@shared/models' import { ActorKeysPayload } from '@shared/models'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { EmailPayload } from '@shared/models' import { EmailPayload } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { Emailer } from '../../emailer' import { Emailer } from '../../emailer'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file' import { VideoFileModel } from '@server/models/video/video-file'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { remove } from 'fs-extra' import { remove } from 'fs-extra'
import { join } from 'path' import { join } from 'path'
import { logger, loggerTagsFactory } from '@server/helpers/logger' import { logger, loggerTagsFactory } from '@server/helpers/logger'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { copy, stat } from 'fs-extra' import { copy, stat } from 'fs-extra'
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
import { CONFIG } from '@server/initializers/config' import { CONFIG } from '@server/initializers/config'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { move, remove, stat } from 'fs-extra' import { move, remove, stat } from 'fs-extra'
import { retryTransactionWrapper } from '@server/helpers/database-utils' import { retryTransactionWrapper } from '@server/helpers/database-utils'
import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { readdir, remove } from 'fs-extra' import { readdir, remove } from 'fs-extra'
import { join } from 'path' import { join } from 'path'
import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler'
import { VideoRedundancyPayload } from '@shared/models' import { VideoRedundancyPayload } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { move, remove } from 'fs-extra' import { move, remove } from 'fs-extra'
import { join } from 'path' import { join } from 'path'
import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg'

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
import { Hooks } from '@server/lib/plugins/hooks' import { Hooks } from '@server/lib/plugins/hooks'
import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'

View File

@ -1,7 +1,19 @@
import Bull, { Job, JobOptions, Queue } from 'bull' import {
Job,
JobsOptions,
Queue,
QueueEvents,
QueueEventsOptions,
QueueOptions,
QueueScheduler,
QueueSchedulerOptions,
Worker,
WorkerOptions
} from 'bullmq'
import { jobStates } from '@server/helpers/custom-validators/jobs' import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config' import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
import { timeoutPromise } from '@shared/core-utils'
import { import {
ActivitypubFollowPayload, ActivitypubFollowPayload,
ActivitypubHttpBroadcastPayload, ActivitypubHttpBroadcastPayload,
@ -120,7 +132,11 @@ class JobQueue {
private static instance: JobQueue private static instance: JobQueue
private workers: { [id in JobType]?: Worker } = {}
private queues: { [id in JobType]?: Queue } = {} private queues: { [id in JobType]?: Queue } = {}
private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
private queueEvents: { [id in JobType]?: QueueEvents } = {}
private initialized = false private initialized = false
private jobRedisPrefix: string private jobRedisPrefix: string
@ -134,42 +150,47 @@ class JobQueue {
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
const queueOptions: Bull.QueueOptions = {
prefix: this.jobRedisPrefix,
redis: {
password: CONFIG.REDIS.AUTH,
db: CONFIG.REDIS.DB,
host: CONFIG.REDIS.HOSTNAME,
port: CONFIG.REDIS.PORT,
path: CONFIG.REDIS.SOCKET
},
settings: {
maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
}
}
for (const handlerName of (Object.keys(handlers) as JobType[])) { for (const handlerName of (Object.keys(handlers) as JobType[])) {
const queue = new Bull(handlerName, queueOptions) this.buildWorker(handlerName, produceOnly)
this.buildQueue(handlerName)
if (produceOnly) { this.buildQueueScheduler(handlerName, produceOnly)
queue.pause(true) this.buildQueueEvent(handlerName, produceOnly)
.catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
} }
const handler = handlers[handlerName] this.addRepeatableJobs()
}
queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { private buildWorker (handlerName: JobType, produceOnly: boolean) {
const workerOptions: WorkerOptions = {
autorun: !produceOnly,
concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix,
connection: this.getRedisConnection()
}
const handler = function (job: Job) {
const timeout = JOB_TTL[handlerName]
const p = handlers[handlerName](job)
if (!timeout) return p
return timeoutPromise(p, timeout)
}
const processor = async (jobArg: Job<any>) => {
const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
}).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) }
queue.on('failed', (job, err) => { const worker = new Worker(handlerName, processor, workerOptions)
worker.on('failed', (job, err) => {
const logLevel = silentFailure.has(handlerName) const logLevel = silentFailure.has(handlerName)
? 'debug' ? 'debug'
: 'error' : 'error'
logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
if (errorHandlers[job.name]) { if (errorHandlers[job.name]) {
errorHandlers[job.name](job, err) errorHandlers[job.name](job, err)
@ -177,32 +198,83 @@ class JobQueue {
} }
}) })
queue.on('error', err => { worker.on('error', err => {
logger.error('Error in job queue %s.', handlerName, { err }) logger.error('Error in job queue %s.', handlerName, { err })
}) })
this.queues[handlerName] = queue this.workers[handlerName] = worker
} }
this.addRepeatableJobs() private buildQueue (handlerName: JobType) {
const queueOptions: QueueOptions = {
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix
} }
terminate () { this.queues[handlerName] = new Queue(handlerName, queueOptions)
for (const queueName of Object.keys(this.queues)) {
const queue = this.queues[queueName]
queue.close()
} }
private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
const queueSchedulerOptions: QueueSchedulerOptions = {
autorun: !produceOnly,
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix,
maxStalledCount: 10
}
this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
}
private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
const queueEventsOptions: QueueEventsOptions = {
autorun: !produceOnly,
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix
}
this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
}
private getRedisConnection () {
return {
password: CONFIG.REDIS.AUTH,
db: CONFIG.REDIS.DB,
host: CONFIG.REDIS.HOSTNAME,
port: CONFIG.REDIS.PORT,
path: CONFIG.REDIS.SOCKET
}
}
async terminate () {
const promises = Object.keys(this.workers)
.map(handlerName => {
const worker: Worker = this.workers[handlerName]
const queue: Queue = this.queues[handlerName]
const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([
worker.close(false),
queue.close(),
queueScheduler.close(),
queueEvent.close()
])
})
return Promise.all(promises)
} }
async pause () { async pause () {
for (const handler of Object.keys(this.queues)) { for (const handler of Object.keys(this.workers)) {
await this.queues[handler].pause(true) const worker: Worker = this.workers[handler]
await worker.pause()
} }
} }
async resume () { resume () {
for (const handler of Object.keys(this.queues)) { for (const handler of Object.keys(this.workers)) {
await this.queues[handler].resume(true) const worker: Worker = this.workers[handler]
worker.resume()
} }
} }
@ -211,22 +283,21 @@ class JobQueue {
.catch(err => logger.error('Cannot create job.', { err, obj })) .catch(err => logger.error('Cannot create job.', { err, obj }))
} }
createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
const queue: Queue = this.queues[obj.type] const queue: Queue = this.queues[obj.type]
if (queue === undefined) { if (queue === undefined) {
logger.error('Unknown queue %s: cannot create job.', obj.type) logger.error('Unknown queue %s: cannot create job.', obj.type)
return return
} }
const jobArgs: JobOptions = { const jobArgs: JobsOptions = {
backoff: { delay: 60 * 1000, type: 'exponential' }, backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[obj.type], attempts: JOB_ATTEMPTS[obj.type],
timeout: JOB_TTL[obj.type],
priority: options.priority, priority: options.priority,
delay: options.delay delay: options.delay
} }
return queue.add(obj.payload, jobArgs) return queue.add('job', obj.payload, jobArgs)
} }
async listForApi (options: { async listForApi (options: {
@ -244,7 +315,8 @@ class JobQueue {
const filteredJobTypes = this.filterJobTypes(jobType) const filteredJobTypes = this.filterJobTypes(jobType)
for (const jobType of filteredJobTypes) { for (const jobType of filteredJobTypes) {
const queue = this.queues[jobType] const queue: Queue = this.queues[jobType]
if (queue === undefined) { if (queue === undefined) {
logger.error('Unknown queue %s to list jobs.', jobType) logger.error('Unknown queue %s to list jobs.', jobType)
continue continue
@ -297,18 +369,22 @@ class JobQueue {
async removeOldJobs () { async removeOldJobs () {
for (const key of Object.keys(this.queues)) { for (const key of Object.keys(this.queues)) {
const queue = this.queues[key] const queue: Queue = this.queues[key]
await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
} }
} }
waitJob (job: Job) {
return job.waitUntilFinished(this.queueEvents[job.queueName])
}
private addRepeatableJobs () { private addRepeatableJobs () {
this.queues['videos-views-stats'].add({}, { this.queues['videos-views-stats'].add('job', {}, {
repeat: REPEAT_JOBS['videos-views-stats'] repeat: REPEAT_JOBS['videos-views-stats']
}).catch(err => logger.error('Cannot add repeatable job.', { err })) }).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
this.queues['activitypub-cleaner'].add({}, { this.queues['activitypub-cleaner'].add('job', {}, {
repeat: REPEAT_JOBS['activitypub-cleaner'] repeat: REPEAT_JOBS['activitypub-cleaner']
}).catch(err => logger.error('Cannot add repeatable job.', { err })) }).catch(err => logger.error('Cannot add repeatable job.', { err }))
} }

View File

@ -1,4 +1,4 @@
import { Job } from 'bull' import { Job } from 'bullmq'
import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
import { basename, extname as extnameUtil, join } from 'path' import { basename, extname as extnameUtil, join } from 'path'
import { toEven } from '@server/helpers/core-utils' import { toEven } from '@server/helpers/core-utils'

View File

@ -6,7 +6,20 @@ function isCatchable (value: any) {
return value && typeof value.catch === 'function' return value && typeof value.catch === 'function'
} }
function timeoutPromise <T> (promise: Promise<T>, timeoutMs: number) {
let timer: ReturnType<typeof setTimeout>
return Promise.race([
promise,
new Promise((_res, rej) => {
timer = setTimeout(() => rej(new Error('Timeout')), timeoutMs)
})
]).finally(() => clearTimeout(timer))
}
export { export {
isPromise, isPromise,
isCatchable isCatchable,
timeoutPromise
} }

View File

@ -4,7 +4,7 @@ import { VideoResolution } from '../videos/file/video-resolution.enum'
import { VideoStudioTaskCut } from '../videos/studio' import { VideoStudioTaskCut } from '../videos/studio'
import { SendEmailOptions } from './emailer.model' import { SendEmailOptions } from './emailer.model'
export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused' export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused' | 'waiting-children'
export type JobType = export type JobType =
| 'activitypub-http-unicast' | 'activitypub-http-unicast'
@ -27,8 +27,8 @@ export type JobType =
| 'video-studio-edition' | 'video-studio-edition'
export interface Job { export interface Job {
id: number id: number | string
state: JobState state: JobState | 'unknown'
type: JobType type: JobType
data: any data: any
priority: number priority: number

View File

@ -1959,14 +1959,6 @@
"@types/connect" "*" "@types/connect" "*"
"@types/node" "*" "@types/node" "*"
"@types/bull@^3.15.0":
version "3.15.8"
resolved "https://registry.yarnpkg.com/@types/bull/-/bull-3.15.8.tgz#ae2139f94490d740b37c8da5d828ce75dd82ce7c"
integrity sha512-8DbSPMSsZH5PWPnGEkAZLYgJEH4ghHJNKF7LB6Wr5R0/v6g+Vs+JoaA7kcvLtHE936xg2WpFPkaoaJgExOmKDw==
dependencies:
"@types/ioredis" "*"
"@types/redis" "^2.8.0"
"@types/bytes@^3.0.0": "@types/bytes@^3.0.0":
version "3.1.1" version "3.1.1"
resolved "https://registry.yarnpkg.com/@types/bytes/-/bytes-3.1.1.tgz#67a876422e660dc4c10a27f3e5bcfbd5455f01d0" resolved "https://registry.yarnpkg.com/@types/bytes/-/bytes-3.1.1.tgz#67a876422e660dc4c10a27f3e5bcfbd5455f01d0"
@ -2100,13 +2092,6 @@
resolved "https://registry.yarnpkg.com/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz#0ea7b61496902b95890dc4c3a116b60cb8dae812" resolved "https://registry.yarnpkg.com/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz#0ea7b61496902b95890dc4c3a116b60cb8dae812"
integrity sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ== integrity sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ==
"@types/ioredis@*":
version "4.28.10"
resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.10.tgz#40ceb157a4141088d1394bb87c98ed09a75a06ff"
integrity sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==
dependencies:
"@types/node" "*"
"@types/json-buffer@~3.0.0": "@types/json-buffer@~3.0.0":
version "3.0.0" version "3.0.0"
resolved "https://registry.yarnpkg.com/@types/json-buffer/-/json-buffer-3.0.0.tgz#85c1ff0f0948fc159810d4b5be35bf8c20875f64" resolved "https://registry.yarnpkg.com/@types/json-buffer/-/json-buffer-3.0.0.tgz#85c1ff0f0948fc159810d4b5be35bf8c20875f64"
@ -2284,13 +2269,6 @@
resolved "https://registry.yarnpkg.com/@types/range-parser/-/range-parser-1.2.4.tgz#cd667bcfdd025213aafb7ca5915a932590acdcdc" resolved "https://registry.yarnpkg.com/@types/range-parser/-/range-parser-1.2.4.tgz#cd667bcfdd025213aafb7ca5915a932590acdcdc"
integrity sha512-EEhsLsD6UsDM1yFhAvy0Cjr6VwmpMWqFBCb9w07wVugF7w9nfajxLuVmngTIpgS6svCnm6Vaw+MZhoDCKnOfsw== integrity sha512-EEhsLsD6UsDM1yFhAvy0Cjr6VwmpMWqFBCb9w07wVugF7w9nfajxLuVmngTIpgS6svCnm6Vaw+MZhoDCKnOfsw==
"@types/redis@^2.8.0":
version "2.8.32"
resolved "https://registry.yarnpkg.com/@types/redis/-/redis-2.8.32.tgz#1d3430219afbee10f8cfa389dad2571a05ecfb11"
integrity sha512-7jkMKxcGq9p242exlbsVzuJb57KqHRhNl4dHoQu2Y5v9bCAbtIXXH0R3HleSQW4CTOqpHIYUW3t6tpUj4BVQ+w==
dependencies:
"@types/node" "*"
"@types/request@^2.0.3": "@types/request@^2.0.3":
version "2.48.8" version "2.48.8"
resolved "https://registry.yarnpkg.com/@types/request/-/request-2.48.8.tgz#0b90fde3b655ab50976cb8c5ac00faca22f5a82c" resolved "https://registry.yarnpkg.com/@types/request/-/request-2.48.8.tgz#0b90fde3b655ab50976cb8c5ac00faca22f5a82c"
@ -3178,20 +3156,20 @@ builtins@^5.0.1:
dependencies: dependencies:
semver "^7.0.0" semver "^7.0.0"
bull@^4.1.0: bullmq@^1.87.0:
version "4.8.4" version "1.87.0"
resolved "https://registry.yarnpkg.com/bull/-/bull-4.8.4.tgz#c538610492050d5160dbd9180704145f135a0aa9" resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.87.0.tgz#e93618302f547239fbb85ee47f7f1f2c3d0c5eef"
integrity sha512-vDNhM/pvfFY3+msulMbqPBdBO7ntKxRZRtMfi3EguVW/Ozo4uez+B81I8ZoDxYCLgSOBfwRuPnFtcv7QNzm4Ew== integrity sha512-oN44FaiWJDviWBNx3V8o4FQBdHrfVHRwJuYvU4HnWpBVdCKd6HMbKqF+XeuuxcqBPbbf7cl6hThoKZ+9iTCOkA==
dependencies: dependencies:
cron-parser "^4.2.1" cron-parser "^4.2.1"
debuglog "^1.0.0"
get-port "^5.1.1" get-port "^5.1.1"
glob "^7.2.0"
ioredis "^4.28.5" ioredis "^4.28.5"
lodash "^4.17.21" lodash "^4.17.21"
msgpackr "^1.5.2" msgpackr "^1.4.6"
p-timeout "^3.2.0" semver "^7.3.7"
semver "^7.3.2" tslib "^1.14.1"
uuid "^8.3.0" uuid "^8.3.2"
busboy@^1.0.0: busboy@^1.0.0:
version "1.6.0" version "1.6.0"
@ -3856,11 +3834,6 @@ debug@^3.2.7:
dependencies: dependencies:
ms "^2.1.1" ms "^2.1.1"
debuglog@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/debuglog/-/debuglog-1.0.1.tgz#aa24ffb9ac3df9a2351837cfb2d279360cd78492"
integrity sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw==
decamelize@^1.2.0: decamelize@^1.2.0:
version "1.2.0" version "1.2.0"
resolved "https://registry.yarnpkg.com/decamelize/-/decamelize-1.2.0.tgz#f6534d15148269b20352e7bee26f501f9a191290" resolved "https://registry.yarnpkg.com/decamelize/-/decamelize-1.2.0.tgz#f6534d15148269b20352e7bee26f501f9a191290"
@ -5169,7 +5142,7 @@ glob@7.2.0:
once "^1.3.0" once "^1.3.0"
path-is-absolute "^1.0.0" path-is-absolute "^1.0.0"
glob@^7.1.3: glob@^7.1.3, glob@^7.2.0:
version "7.2.3" version "7.2.3"
resolved "https://registry.yarnpkg.com/glob/-/glob-7.2.3.tgz#b8df0fb802bbfa8e89bd1d938b4e16578ed44f2b" resolved "https://registry.yarnpkg.com/glob/-/glob-7.2.3.tgz#b8df0fb802bbfa8e89bd1d938b4e16578ed44f2b"
integrity sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q== integrity sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==
@ -6696,10 +6669,10 @@ msgpackr-extract@^2.0.2:
"@msgpackr-extract/msgpackr-extract-linux-x64" "2.0.2" "@msgpackr-extract/msgpackr-extract-linux-x64" "2.0.2"
"@msgpackr-extract/msgpackr-extract-win32-x64" "2.0.2" "@msgpackr-extract/msgpackr-extract-win32-x64" "2.0.2"
msgpackr@^1.5.2: msgpackr@^1.4.6:
version "1.6.1" version "1.6.2"
resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.1.tgz#4f3c94d6a5b819b838ffc736eddaf60eba436d20" resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.2.tgz#176cd9f6b4437dad87a839b37f23c2dfee408d9a"
integrity sha512-Je+xBEfdjtvA4bKaOv8iRhjC8qX2oJwpYH4f7JrG4uMVJVmnmkAT4pjKdbztKprGj3iwjcxPzb5umVZ02Qq3tA== integrity sha512-bqSQ0DYJbXbrJcrZFmMygUZmqQiDfI2ewFVWcrZY12w5XHWtPuW4WppDT/e63Uu311ajwkRRXSoF0uILroBeTA==
optionalDependencies: optionalDependencies:
msgpackr-extract "^2.0.2" msgpackr-extract "^2.0.2"
@ -9054,7 +9027,7 @@ tsconfig-paths@^4.0.0:
minimist "^1.2.6" minimist "^1.2.6"
strip-bom "^3.0.0" strip-bom "^3.0.0"
tslib@^1.11.1, tslib@^1.8.1: tslib@^1.11.1, tslib@^1.14.1, tslib@^1.8.1:
version "1.14.1" version "1.14.1"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==
@ -9277,7 +9250,7 @@ uuid-parse@^1.1.0:
resolved "https://registry.yarnpkg.com/uuid-parse/-/uuid-parse-1.1.0.tgz#7061c5a1384ae0e1f943c538094597e1b5f3a65b" resolved "https://registry.yarnpkg.com/uuid-parse/-/uuid-parse-1.1.0.tgz#7061c5a1384ae0e1f943c538094597e1b5f3a65b"
integrity sha512-OdmXxA8rDsQ7YpNVbKSJkNzTw2I+S5WsbMDnCtIWSQaosNAcWtFuI/YK1TjzUI6nbkgiqEyh8gWngfcv8Asd9A== integrity sha512-OdmXxA8rDsQ7YpNVbKSJkNzTw2I+S5WsbMDnCtIWSQaosNAcWtFuI/YK1TjzUI6nbkgiqEyh8gWngfcv8Asd9A==
uuid@^8.3.0, uuid@^8.3.2: uuid@^8.3.2:
version "8.3.2" version "8.3.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==