Server: add job scheduler to transcode video files

This commit is contained in:
Chocobozzz 2017-05-02 22:02:27 +02:00
parent 15d4ee04a9
commit 227d02fead
11 changed files with 340 additions and 23 deletions

View File

@ -28,3 +28,9 @@ admin:
signup:
enabled: false
# If enabled, the video will be transcoded to mp4 (x264) with "faststart" flag
# Uses a lot of CPU!
transcoding:
enabled: true
threads: 2

View File

@ -29,3 +29,9 @@ admin:
signup:
enabled: false
# If enabled, the video will be transcoded to mp4 (x264) with "faststart" flag
# Uses a lot of CPU!
transcoding:
enabled: false
threads: 2

View File

@ -18,3 +18,6 @@ storage:
admin:
email: 'admin6@example.com'
transcoding:
enabled: true

View File

@ -40,6 +40,7 @@ const customValidators = require('./server/helpers/custom-validators')
const friends = require('./server/lib/friends')
const installer = require('./server/initializers/installer')
const migrator = require('./server/initializers/migrator')
const jobScheduler = require('./server/lib/jobs/job-scheduler')
const routes = require('./server/controllers')
// ----------- Command line -----------
@ -133,6 +134,9 @@ function onDatabaseInitDone () {
// Activate the communication with friends
friends.activate()
// Activate job scheduler
jobScheduler.activate()
logger.info('Server listening on port %d', port)
logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL)

View File

@ -29,7 +29,7 @@ function checkMissedConfig () {
'webserver.https', 'webserver.hostname', 'webserver.port',
'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password',
'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews',
'admin.email', 'signup.enabled'
'admin.email', 'signup.enabled', 'transcoding.enabled', 'transcoding.threads'
]
const miss = []

View File

@ -64,6 +64,10 @@ const CONFIG = {
},
SIGNUP: {
ENABLED: config.get('signup.enabled')
},
TRANSCODING: {
ENABLED: config.get('transcoding.enabled'),
THREADS: config.get('transcoding.threads')
}
}
CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT
@ -223,6 +227,18 @@ const REMOTE_SCHEME = {
WS: 'wss'
}
const JOB_STATES = {
PENDING: 'pending',
PROCESSING: 'processing',
ERROR: 'error',
SUCCESS: 'success'
}
// How many maximum jobs we fetch from the database per cycle
const JOBS_FETCH_LIMIT_PER_CYCLE = 10
const JOBS_CONCURRENCY = 1
// 1 minutes
let JOBS_FETCHING_INTERVAL = 60000
// ---------------------------------------------------------------------------
const PRIVATE_CERT_NAME = 'peertube.key.pem'
@ -264,6 +280,7 @@ if (isTestInstance() === true) {
CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14
FRIEND_SCORE.BASE = 20
REQUESTS_INTERVAL = 10000
JOBS_FETCHING_INTERVAL = 10000
REMOTE_SCHEME.HTTP = 'http'
REMOTE_SCHEME.WS = 'ws'
STATIC_MAX_AGE = 0
@ -277,6 +294,10 @@ module.exports = {
CONFIG,
CONSTRAINTS_FIELDS,
FRIEND_SCORE,
JOBS_FETCHING_INTERVAL,
JOB_STATES,
JOBS_CONCURRENCY,
JOBS_FETCH_LIMIT_PER_CYCLE,
LAST_MIGRATION_VERSION,
OAUTH_LIFETIME,
PAGINATION_COUNT_DEFAULT,

View File

@ -0,0 +1,7 @@
'use strict'
const videoTranscoder = require('./video-transcoder')
module.exports = {
videoTranscoder
}

View File

@ -0,0 +1,34 @@
'use strict'
const db = require('../../../initializers/database')
const logger = require('../../../helpers/logger')
const VideoTranscoderHandler = {
process,
onError,
onSuccess
}
// ---------------------------------------------------------------------------
function process (data, callback) {
db.Video.load(data.id, function (err, video) {
if (err) return callback(err)
video.transcodeVideofile(callback)
})
}
function onError (err, jobId, callback) {
logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
return callback()
}
function onSuccess (data, jobId, callback) {
logger.info('Job %d is a success.', jobId)
return callback()
}
// ---------------------------------------------------------------------------
module.exports = VideoTranscoderHandler

View File

@ -0,0 +1,110 @@
'use strict'
const forever = require('async/forever')
const queue = require('async/queue')
const constants = require('../../initializers/constants')
const db = require('../../initializers/database')
const logger = require('../../helpers/logger')
const jobHandlers = require('./handlers')
const jobScheduler = {
activate,
createJob
}
function activate () {
logger.info('Jobs scheduler activated.')
const jobsQueue = queue(processJob)
forever(
function (next) {
if (jobsQueue.length() !== 0) {
// Finish processing the queue first
return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
}
db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
if (err) {
logger.error('Cannot list pending jobs.', { error: err })
} else {
jobs.forEach(function (job) {
jobsQueue.push(job)
})
}
// Optimization: we could use "drain" from queue object
return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
})
}
)
}
// ---------------------------------------------------------------------------
module.exports = jobScheduler
// ---------------------------------------------------------------------------
function createJob (transaction, handlerName, handlerInputData, callback) {
const createQuery = {
state: constants.JOB_STATES.PENDING,
handlerName,
handlerInputData
}
const options = { transaction }
db.Job.create(createQuery, options).asCallback(callback)
}
function processJob (job, callback) {
const jobHandler = jobHandlers[job.handlerName]
logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
job.state = constants.JOB_STATES.PROCESSING
job.save().asCallback(function (err) {
if (err) return cannotSaveJobError(err, callback)
if (jobHandler === undefined) {
logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
return callback()
}
return jobHandler.process(job.handlerInputData, function (err, result) {
if (err) {
logger.error('Error in job handler %s.', job.handlerName, { error: err })
return onJobError(jobHandler, job, callback)
}
return onJobSuccess(jobHandler, job, callback)
})
})
}
function onJobError (jobHandler, job, callback) {
job.state = constants.JOB_STATES.ERROR
job.save().asCallback(function (err) {
if (err) return cannotSaveJobError(err, callback)
return jobHandler.onError(err, job.id, callback)
})
}
function onJobSuccess (jobHandler, job, callback) {
job.state = constants.JOB_STATES.SUCCESS
job.save().asCallback(function (err) {
if (err) return cannotSaveJobError(err, callback)
return jobHandler.onSuccess(err, job.id, callback)
})
}
function cannotSaveJobError (err, callback) {
logger.error('Cannot save new job state.', { error: err })
return callback(err)
}

54
server/models/job.js Normal file
View File

@ -0,0 +1,54 @@
'use strict'
const values = require('lodash/values')
const constants = require('../initializers/constants')
// ---------------------------------------------------------------------------
module.exports = function (sequelize, DataTypes) {
const Job = sequelize.define('Job',
{
state: {
type: DataTypes.ENUM(values(constants.JOB_STATES)),
allowNull: false
},
handlerName: {
type: DataTypes.STRING,
allowNull: false
},
handlerInputData: {
type: DataTypes.JSON,
allowNull: true
}
},
{
indexes: [
{
fields: [ 'state' ]
}
],
classMethods: {
listWithLimit
}
}
)
return Job
}
// ---------------------------------------------------------------------------
function listWithLimit (limit, callback) {
const query = {
order: [
[ 'id', 'ASC' ]
],
limit: limit,
where: {
state: constants.JOB_STATES.PENDING
}
}
return this.findAll(query).asCallback(callback)
}

View File

@ -7,6 +7,7 @@ const fs = require('fs')
const magnetUtil = require('magnet-uri')
const map = require('lodash/map')
const parallel = require('async/parallel')
const series = require('async/series')
const parseTorrent = require('parse-torrent')
const pathUtils = require('path')
const values = require('lodash/values')
@ -17,6 +18,7 @@ const friends = require('../lib/friends')
const modelUtils = require('./utils')
const customVideosValidators = require('../helpers/custom-validators').videos
const db = require('../initializers/database')
const jobScheduler = require('../lib/jobs/job-scheduler')
// ---------------------------------------------------------------------------
@ -203,6 +205,7 @@ module.exports = function (sequelize, DataTypes) {
toFormatedJSON,
toAddRemoteJSON,
toUpdateRemoteJSON,
transcodeVideofile,
removeFromBlacklist
},
hooks: {
@ -234,38 +237,30 @@ function beforeCreate (video, options, next) {
tasks.push(
function createVideoTorrent (callback) {
const options = {
announceList: [
[ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
],
urlList: [
constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
]
}
createTorrent(videoPath, options, function (err, torrent) {
if (err) return callback(err)
const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
fs.writeFile(filePath, torrent, function (err) {
if (err) return callback(err)
const parsedTorrent = parseTorrent(torrent)
video.set('infoHash', parsedTorrent.infoHash)
video.validate().asCallback(callback)
})
})
createTorrentFromVideo(video, videoPath, callback)
},
function createVideoThumbnail (callback) {
createThumbnail(video, videoPath, callback)
},
function createVIdeoPreview (callback) {
function createVideoPreview (callback) {
createPreview(video, videoPath, callback)
}
)
if (constants.CONFIG.TRANSCODING.ENABLED === true) {
tasks.push(
function createVideoTranscoderJob (callback) {
const dataInput = {
id: video.id
}
jobScheduler.createJob(options.transaction, 'videoTranscoder', dataInput, callback)
}
)
}
return parallel(tasks, next)
}
@ -503,6 +498,59 @@ function toUpdateRemoteJSON (callback) {
return json
}
function transcodeVideofile (finalCallback) {
const video = this
const videosDirectory = constants.CONFIG.STORAGE.VIDEOS_DIR
const newExtname = '.mp4'
const videoInputPath = pathUtils.join(videosDirectory, video.getVideoFilename())
const videoOutputPath = pathUtils.join(videosDirectory, video.id + '-transcoded' + newExtname)
ffmpeg(videoInputPath)
.output(videoOutputPath)
.videoCodec('libx264')
.outputOption('-threads ' + constants.CONFIG.TRANSCODING.THREADS)
.outputOption('-movflags faststart')
.on('error', finalCallback)
.on('end', function () {
series([
function removeOldFile (callback) {
fs.unlink(videoInputPath, callback)
},
function moveNewFile (callback) {
// Important to do this before getVideoFilename() to take in account the new file extension
video.set('extname', newExtname)
const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
fs.rename(videoOutputPath, newVideoPath, callback)
},
function torrent (callback) {
const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
createTorrentFromVideo(video, newVideoPath, callback)
},
function videoExtension (callback) {
video.save().asCallback(callback)
}
], function (err) {
if (err) {
// Autodescruction...
video.destroy().asCallback(function (err) {
if (err) logger.error('Cannot destruct video after transcoding failure.', { error: err })
})
return finalCallback(err)
}
return finalCallback(null)
})
})
.run()
}
// ------------------------------ STATICS ------------------------------
function generateThumbnailFromData (video, thumbnailData, callback) {
@ -737,6 +785,30 @@ function removePreview (video, callback) {
fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback)
}
function createTorrentFromVideo (video, videoPath, callback) {
const options = {
announceList: [
[ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
],
urlList: [
constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
]
}
createTorrent(videoPath, options, function (err, torrent) {
if (err) return callback(err)
const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
fs.writeFile(filePath, torrent, function (err) {
if (err) return callback(err)
const parsedTorrent = parseTorrent(torrent)
video.set('infoHash', parsedTorrent.infoHash)
video.validate().asCallback(callback)
})
})
}
function createPreview (video, videoPath, callback) {
generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback)
}