Server: try to have a better video integrity

This commit is contained in:
Chocobozzz 2017-01-06 23:24:47 +01:00
parent bb0b243c92
commit ed04d94f6d
6 changed files with 194 additions and 71 deletions

View File

@ -10,6 +10,7 @@ const secureMiddleware = middlewares.secure
const videosValidators = middlewares.validators.remote.videos const videosValidators = middlewares.validators.remote.videos
const signatureValidators = middlewares.validators.remote.signature const signatureValidators = middlewares.validators.remote.signature
const logger = require('../../../helpers/logger') const logger = require('../../../helpers/logger')
const utils = require('../../../helpers/utils')
const router = express.Router() const router = express.Router()
@ -37,11 +38,11 @@ function remoteVideos (req, res, next) {
switch (request.type) { switch (request.type) {
case 'add': case 'add':
addRemoteVideo(data, fromPod, callbackEach) addRemoteVideoRetryWrapper(data, fromPod, callbackEach)
break break
case 'update': case 'update':
updateRemoteVideo(data, fromPod, callbackEach) updateRemoteVideoRetryWrapper(data, fromPod, callbackEach)
break break
case 'remove': case 'remove':
@ -63,13 +64,30 @@ function remoteVideos (req, res, next) {
return res.type('json').status(204).end() return res.type('json').status(204).end()
} }
// Handle retries on fail
function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) {
utils.transactionRetryer(
function (callback) {
return addRemoteVideo(videoToCreateData, fromPod, callback)
},
function (err) {
if (err) {
logger.error('Cannot insert the remote video with many retries.', { error: err })
return finalCallback(err)
}
return finalCallback()
}
)
}
function addRemoteVideo (videoToCreateData, fromPod, finalCallback) { function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
logger.debug('Adding remote video "%s".', videoToCreateData.name) logger.debug('Adding remote video "%s".', videoToCreateData.remoteId)
waterfall([ waterfall([
function startTransaction (callback) { function startTransaction (callback) {
db.sequelize.transaction().asCallback(function (err, t) { db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) {
return callback(err, t) return callback(err, t)
}) })
}, },
@ -103,6 +121,7 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
authorId: author.id, authorId: author.id,
duration: videoToCreateData.duration, duration: videoToCreateData.duration,
createdAt: videoToCreateData.createdAt, createdAt: videoToCreateData.createdAt,
// FIXME: updatedAt does not seems to be considered by Sequelize
updatedAt: videoToCreateData.updatedAt updatedAt: videoToCreateData.updatedAt
} }
@ -142,7 +161,8 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
], function (err, t) { ], function (err, t) {
if (err) { if (err) {
logger.error('Cannot insert the remote video.') // This is just a debug because we will retry the insert
logger.debug('Cannot insert the remote video.', { error: err })
// Abort transaction? // Abort transaction?
if (t) t.rollback() if (t) t.rollback()
@ -157,8 +177,25 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
}) })
} }
// Handle retries on fail
function updateRemoteVideoRetryWrapper (videoAttributesToUpdate, fromPod, finalCallback) {
utils.transactionRetryer(
function (callback) {
return updateRemoteVideo(videoAttributesToUpdate, fromPod, callback)
},
function (err) {
if (err) {
logger.error('Cannot update the remote video with many retries.', { error: err })
return finalCallback(err)
}
return finalCallback()
}
)
}
function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) { function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) {
logger.debug('Updating remote video "%s".', videoAttributesToUpdate.name) logger.debug('Updating remote video "%s".', videoAttributesToUpdate.remoteId)
waterfall([ waterfall([
@ -208,7 +245,8 @@ function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) {
], function (err, t) { ], function (err, t) {
if (err) { if (err) {
logger.error('Cannot update the remote video.') // This is just a debug because we will retry the insert
logger.debug('Cannot update the remote video.', { error: err })
// Abort transaction? // Abort transaction?
if (t) t.rollback() if (t) t.rollback()
@ -238,7 +276,7 @@ function reportAbuseRemoteVideo (reportData, fromPod, callback) {
if (err || !video) { if (err || !video) {
if (!err) err = new Error('video not found') if (!err) err = new Error('video not found')
logger.error('Cannot load video from host and remote id.', { error: err }) logger.error('Cannot load video from id.', { error: err, id: reportData.videoRemoteId })
return callback(err) return callback(err)
} }
@ -260,7 +298,7 @@ function fetchVideo (podHost, remoteId, callback) {
if (err || !video) { if (err || !video) {
if (!err) err = new Error('video not found') if (!err) err = new Error('video not found')
logger.error('Cannot load video from host and remote id.', { error: err }) logger.error('Cannot load video from host and remote id.', { error: err, podHost, remoteId })
return callback(err) return callback(err)
} }

View File

@ -70,13 +70,13 @@ router.put('/:id',
oAuth.authenticate, oAuth.authenticate,
reqFiles, reqFiles,
validatorsVideos.videosUpdate, validatorsVideos.videosUpdate,
updateVideo updateVideoRetryWrapper
) )
router.post('/', router.post('/',
oAuth.authenticate, oAuth.authenticate,
reqFiles, reqFiles,
validatorsVideos.videosAdd, validatorsVideos.videosAdd,
addVideo addVideoRetryWrapper
) )
router.get('/:id', router.get('/:id',
validatorsVideos.videosGet, validatorsVideos.videosGet,
@ -103,19 +103,37 @@ module.exports = router
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function addVideo (req, res, next) { // Wrapper to video add that retry the function if there is a database error
const videoFile = req.files.videofile[0] // We need this because we run the transaction in SERIALIZABLE isolation that can fail
function addVideoRetryWrapper (req, res, next) {
utils.transactionRetryer(
function (callback) {
return addVideo(req, res, req.files.videofile[0], callback)
},
function (err) {
if (err) {
logger.error('Cannot insert the video with many retries.', { error: err })
return next(err)
}
// TODO : include Location of the new video -> 201
return res.type('json').status(204).end()
}
)
}
function addVideo (req, res, videoFile, callback) {
const videoInfos = req.body const videoInfos = req.body
waterfall([ waterfall([
function startTransaction (callback) { function startTransaction (callbackWaterfall) {
db.sequelize.transaction().asCallback(function (err, t) { db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) {
return callback(err, t) return callbackWaterfall(err, t)
}) })
}, },
function findOrCreateAuthor (t, callback) { function findOrCreateAuthor (t, callbackWaterfall) {
const user = res.locals.oauth.token.User const user = res.locals.oauth.token.User
const name = user.username const name = user.username
@ -124,19 +142,19 @@ function addVideo (req, res, next) {
const userId = user.id const userId = user.id
db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) { db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
return callback(err, t, authorInstance) return callbackWaterfall(err, t, authorInstance)
}) })
}, },
function findOrCreateTags (t, author, callback) { function findOrCreateTags (t, author, callbackWaterfall) {
const tags = videoInfos.tags const tags = videoInfos.tags
db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) { db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
return callback(err, t, author, tagInstances) return callbackWaterfall(err, t, author, tagInstances)
}) })
}, },
function createVideoObject (t, author, tagInstances, callback) { function createVideoObject (t, author, tagInstances, callbackWaterfall) {
const videoData = { const videoData = {
name: videoInfos.name, name: videoInfos.name,
remoteId: null, remoteId: null,
@ -148,74 +166,97 @@ function addVideo (req, res, next) {
const video = db.Video.build(videoData) const video = db.Video.build(videoData)
return callback(null, t, author, tagInstances, video) return callbackWaterfall(null, t, author, tagInstances, video)
}, },
// Set the videoname the same as the id // Set the videoname the same as the id
function renameVideoFile (t, author, tagInstances, video, callback) { function renameVideoFile (t, author, tagInstances, video, callbackWaterfall) {
const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR
const source = path.join(videoDir, videoFile.filename) const source = path.join(videoDir, videoFile.filename)
const destination = path.join(videoDir, video.getVideoFilename()) const destination = path.join(videoDir, video.getVideoFilename())
fs.rename(source, destination, function (err) { fs.rename(source, destination, function (err) {
return callback(err, t, author, tagInstances, video) if (err) return callbackWaterfall(err)
// This is important in case if there is another attempt
videoFile.filename = video.getVideoFilename()
return callbackWaterfall(null, t, author, tagInstances, video)
}) })
}, },
function insertVideoIntoDB (t, author, tagInstances, video, callback) { function insertVideoIntoDB (t, author, tagInstances, video, callbackWaterfall) {
const options = { transaction: t } const options = { transaction: t }
// Add tags association // Add tags association
video.save(options).asCallback(function (err, videoCreated) { video.save(options).asCallback(function (err, videoCreated) {
if (err) return callback(err) if (err) return callbackWaterfall(err)
// Do not forget to add Author informations to the created video // Do not forget to add Author informations to the created video
videoCreated.Author = author videoCreated.Author = author
return callback(err, t, tagInstances, videoCreated) return callbackWaterfall(err, t, tagInstances, videoCreated)
}) })
}, },
function associateTagsToVideo (t, tagInstances, video, callback) { function associateTagsToVideo (t, tagInstances, video, callbackWaterfall) {
const options = { transaction: t } const options = { transaction: t }
video.setTags(tagInstances, options).asCallback(function (err) { video.setTags(tagInstances, options).asCallback(function (err) {
video.Tags = tagInstances video.Tags = tagInstances
return callback(err, t, video) return callbackWaterfall(err, t, video)
}) })
}, },
function sendToFriends (t, video, callback) { function sendToFriends (t, video, callbackWaterfall) {
video.toAddRemoteJSON(function (err, remoteVideo) { video.toAddRemoteJSON(function (err, remoteVideo) {
if (err) return callback(err) if (err) return callbackWaterfall(err)
// Now we'll add the video's meta data to our friends // Now we'll add the video's meta data to our friends
friends.addVideoToFriends(remoteVideo) friends.addVideoToFriends(remoteVideo, t, function (err) {
return callbackWaterfall(err, t)
return callback(null, t) })
}) })
} }
], function andFinally (err, t) { ], function andFinally (err, t) {
if (err) { if (err) {
logger.error('Cannot insert the video.') // This is just a debug because we will retry the insert
logger.debug('Cannot insert the video.', { error: err })
// Abort transaction? // Abort transaction?
if (t) t.rollback() if (t) t.rollback()
return next(err) return callback(err)
} }
// Commit transaction // Commit transaction
t.commit() t.commit()
// TODO : include Location of the new video -> 201 logger.info('Video with name %s created.', videoInfos.name)
return res.type('json').status(204).end()
return callback(null)
}) })
} }
function updateVideo (req, res, next) { function updateVideoRetryWrapper (req, res, next) {
utils.transactionRetryer(
function (callback) {
return updateVideo(req, res, callback)
},
function (err) {
if (err) {
logger.error('Cannot update the video with many retries.', { error: err })
return next(err)
}
// TODO : include Location of the new video -> 201
return res.type('json').status(204).end()
}
)
}
function updateVideo (req, res, finalCallback) {
const videoInstance = res.locals.video const videoInstance = res.locals.video
const videoInfosToUpdate = req.body const videoInfosToUpdate = req.body
@ -267,26 +308,25 @@ function updateVideo (req, res, next) {
const json = videoInstance.toUpdateRemoteJSON() const json = videoInstance.toUpdateRemoteJSON()
// Now we'll update the video's meta data to our friends // Now we'll update the video's meta data to our friends
friends.updateVideoToFriends(json) friends.updateVideoToFriends(json, t, function (err) {
return callback(err, t)
return callback(null, t) })
} }
], function andFinally (err, t) { ], function andFinally (err, t) {
if (err) { if (err) {
logger.error('Cannot insert the video.') logger.debug('Cannot update the video.', { error: err })
// Abort transaction? // Abort transaction?
if (t) t.rollback() if (t) t.rollback()
return next(err) return finalCallback(err)
} }
// Commit transaction // Commit transaction
t.commit() t.commit()
// TODO : include Location of the new video -> 201 return finalCallback(null)
return res.type('json').status(204).end()
}) })
} }

View File

@ -1,6 +1,7 @@
'use strict' 'use strict'
const crypto = require('crypto') const crypto = require('crypto')
const retry = require('async/retry')
const logger = require('./logger') const logger = require('./logger')
@ -9,7 +10,8 @@ const utils = {
cleanForExit, cleanForExit,
generateRandomString, generateRandomString,
isTestInstance, isTestInstance,
getFormatedObjects getFormatedObjects,
transactionRetryer
} }
function badRequest (req, res, next) { function badRequest (req, res, next) {
@ -46,6 +48,18 @@ function getFormatedObjects (objects, objectsTotal) {
} }
} }
function transactionRetryer (func, callback) {
retry({
times: 5,
errorFilter: function (err) {
const willRetry = (err.name === 'SequelizeDatabaseError')
logger.debug('Maybe retrying the transaction function.', { willRetry })
return willRetry
}
}, func, callback)
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
module.exports = utils module.exports = utils

View File

@ -24,16 +24,33 @@ const friends = {
sendOwnedVideosToPod sendOwnedVideosToPod
} }
function addVideoToFriends (videoData) { function addVideoToFriends (videoData, transaction, callback) {
createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, videoData) const options = {
type: 'add',
endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
data: videoData,
transaction
}
createRequest(options, callback)
} }
function updateVideoToFriends (videoData) { function updateVideoToFriends (videoData, transaction, callback) {
createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, videoData) const options = {
type: 'update',
endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
data: videoData,
transaction
}
createRequest(options, callback)
} }
function removeVideoToFriends (videoParams) { function removeVideoToFriends (videoParams) {
createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams) const options = {
type: 'remove',
endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
data: videoParams
}
createRequest(options)
} }
function reportAbuseVideoToFriend (reportData, video) { function reportAbuseVideoToFriend (reportData, video) {
@ -258,25 +275,35 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
} }
// Wrapper that populate "toIds" argument with all our friends if it is not specified // Wrapper that populate "toIds" argument with all our friends if it is not specified
function createRequest (type, endpoint, data, toIds) { // { type, endpoint, data, toIds, transaction }
if (toIds) return _createRequest(type, endpoint, data, toIds) function createRequest (options, callback) {
if (!callback) callback = function () {}
if (options.toIds) return _createRequest(options, callback)
// If the "toIds" pods is not specified, we send the request to all our friends // If the "toIds" pods is not specified, we send the request to all our friends
db.Pod.listAllIds(function (err, podIds) { db.Pod.listAllIds(options.transaction, function (err, podIds) {
if (err) { if (err) {
logger.error('Cannot get pod ids', { error: err }) logger.error('Cannot get pod ids', { error: err })
return return
} }
return _createRequest(type, endpoint, data, podIds) const newOptions = Object.assign(options, { toIds: podIds })
return _createRequest(newOptions, callback)
}) })
} }
function _createRequest (type, endpoint, data, toIds) { // { type, endpoint, data, toIds, transaction }
function _createRequest (options, callback) {
const type = options.type
const endpoint = options.endpoint
const data = options.data
const toIds = options.toIds
const transaction = options.transaction
const pods = [] const pods = []
// If there are no destination pods abort // If there are no destination pods abort
if (toIds.length === 0) return if (toIds.length === 0) return callback(null)
toIds.forEach(function (toPod) { toIds.forEach(function (toPod) {
pods.push(db.Pod.build({ id: toPod })) pods.push(db.Pod.build({ id: toPod }))
@ -290,17 +317,14 @@ function _createRequest (type, endpoint, data, toIds) {
} }
} }
// We run in transaction to keep coherency between Request and RequestToPod tables const dbRequestOptions = {
db.sequelize.transaction(function (t) { transaction
const dbRequestOptions = { }
transaction: t
}
return db.Request.create(createQuery, dbRequestOptions).then(function (request) { return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
return request.setPods(pods, dbRequestOptions) if (err) return callback(err)
})
}).asCallback(function (err) { return request.setPods(pods, dbRequestOptions).asCallback(callback)
if (err) logger.error('Error in createRequest transaction.', { error: err })
}) })
} }

View File

@ -115,11 +115,18 @@ function list (callback) {
return this.findAll().asCallback(callback) return this.findAll().asCallback(callback)
} }
function listAllIds (callback) { function listAllIds (transaction, callback) {
if (!callback) {
callback = transaction
transaction = null
}
const query = { const query = {
attributes: [ 'id' ] attributes: [ 'id' ]
} }
if (transaction) query.transaction = transaction
return this.findAll(query).asCallback(function (err, pods) { return this.findAll(query).asCallback(function (err, pods) {
if (err) return callback(err) if (err) return callback(err)

View File

@ -291,8 +291,8 @@ function listWithLimitAndRandom (limit, callback) {
order: [ order: [
[ 'id', 'ASC' ] [ 'id', 'ASC' ]
], ],
offset: start, // offset: start,
limit: limit, // limit: limit,
include: [ this.sequelize.models.Pod ] include: [ this.sequelize.models.Pod ]
} }