Server: transaction refractoring
This commit is contained in:
parent
98dffd102e
commit
4145c1c689
|
@ -146,27 +146,20 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
|
||||||
video.setTags(tagInstances, options).asCallback(function (err) {
|
video.setTags(tagInstances, options).asCallback(function (err) {
|
||||||
return callback(err, t)
|
return callback(err, t)
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
|
||||||
|
databaseUtils.commitTransaction
|
||||||
|
|
||||||
], function (err, t) {
|
], function (err, t) {
|
||||||
if (err) {
|
if (err) {
|
||||||
// This is just a debug because we will retry the insert
|
// This is just a debug because we will retry the insert
|
||||||
logger.debug('Cannot insert the remote video.', { error: err })
|
logger.debug('Cannot insert the remote video.', { error: err })
|
||||||
|
return databaseUtils.rollbackTransaction(err, t, finalCallback)
|
||||||
// Abort transaction?
|
|
||||||
if (t) t.rollback()
|
|
||||||
|
|
||||||
return finalCallback(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit transaction
|
|
||||||
t.commit().asCallback(function (err) {
|
|
||||||
if (err) return finalCallback(err)
|
|
||||||
|
|
||||||
logger.info('Remote video %s inserted.', videoToCreateData.name)
|
logger.info('Remote video %s inserted.', videoToCreateData.name)
|
||||||
return finalCallback(null)
|
return finalCallback(null)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle retries on fail
|
// Handle retries on fail
|
||||||
|
@ -222,27 +215,20 @@ function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) {
|
||||||
videoInstance.setTags(tagInstances, options).asCallback(function (err) {
|
videoInstance.setTags(tagInstances, options).asCallback(function (err) {
|
||||||
return callback(err, t)
|
return callback(err, t)
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
|
||||||
|
databaseUtils.commitTransaction
|
||||||
|
|
||||||
], function (err, t) {
|
], function (err, t) {
|
||||||
if (err) {
|
if (err) {
|
||||||
// This is just a debug because we will retry the insert
|
// This is just a debug because we will retry the insert
|
||||||
logger.debug('Cannot update the remote video.', { error: err })
|
logger.debug('Cannot update the remote video.', { error: err })
|
||||||
|
return databaseUtils.rollbackTransaction(err, t, finalCallback)
|
||||||
// Abort transaction?
|
|
||||||
if (t) t.rollback()
|
|
||||||
|
|
||||||
return finalCallback(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit transaction
|
|
||||||
t.commit().asCallback(function (err) {
|
|
||||||
if (err) return finalCallback(err)
|
|
||||||
|
|
||||||
logger.info('Remote video %s updated', videoAttributesToUpdate.name)
|
logger.info('Remote video %s updated', videoAttributesToUpdate.name)
|
||||||
return finalCallback(null)
|
return finalCallback(null)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function removeRemoteVideo (videoToRemoveData, fromPod, callback) {
|
function removeRemoteVideo (videoToRemoveData, fromPod, callback) {
|
||||||
|
|
|
@ -120,14 +120,14 @@ function addVideoRetryWrapper (req, res, next) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function addVideo (req, res, videoFile, callback) {
|
function addVideo (req, res, videoFile, finalCallback) {
|
||||||
const videoInfos = req.body
|
const videoInfos = req.body
|
||||||
|
|
||||||
waterfall([
|
waterfall([
|
||||||
|
|
||||||
databaseUtils.startSerializableTransaction,
|
databaseUtils.startSerializableTransaction,
|
||||||
|
|
||||||
function findOrCreateAuthor (t, callbackWaterfall) {
|
function findOrCreateAuthor (t, callback) {
|
||||||
const user = res.locals.oauth.token.User
|
const user = res.locals.oauth.token.User
|
||||||
|
|
||||||
const name = user.username
|
const name = user.username
|
||||||
|
@ -136,19 +136,19 @@ function addVideo (req, res, videoFile, callback) {
|
||||||
const userId = user.id
|
const userId = user.id
|
||||||
|
|
||||||
db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
|
db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
|
||||||
return callbackWaterfall(err, t, authorInstance)
|
return callback(err, t, authorInstance)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
function findOrCreateTags (t, author, callbackWaterfall) {
|
function findOrCreateTags (t, author, callback) {
|
||||||
const tags = videoInfos.tags
|
const tags = videoInfos.tags
|
||||||
|
|
||||||
db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
|
db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
|
||||||
return callbackWaterfall(err, t, author, tagInstances)
|
return callback(err, t, author, tagInstances)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
function createVideoObject (t, author, tagInstances, callbackWaterfall) {
|
function createVideoObject (t, author, tagInstances, callback) {
|
||||||
const videoData = {
|
const videoData = {
|
||||||
name: videoInfos.name,
|
name: videoInfos.name,
|
||||||
remoteId: null,
|
remoteId: null,
|
||||||
|
@ -160,77 +160,70 @@ function addVideo (req, res, videoFile, callback) {
|
||||||
|
|
||||||
const video = db.Video.build(videoData)
|
const video = db.Video.build(videoData)
|
||||||
|
|
||||||
return callbackWaterfall(null, t, author, tagInstances, video)
|
return callback(null, t, author, tagInstances, video)
|
||||||
},
|
},
|
||||||
|
|
||||||
// Set the videoname the same as the id
|
// Set the videoname the same as the id
|
||||||
function renameVideoFile (t, author, tagInstances, video, callbackWaterfall) {
|
function renameVideoFile (t, author, tagInstances, video, callback) {
|
||||||
const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR
|
const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR
|
||||||
const source = path.join(videoDir, videoFile.filename)
|
const source = path.join(videoDir, videoFile.filename)
|
||||||
const destination = path.join(videoDir, video.getVideoFilename())
|
const destination = path.join(videoDir, video.getVideoFilename())
|
||||||
|
|
||||||
fs.rename(source, destination, function (err) {
|
fs.rename(source, destination, function (err) {
|
||||||
if (err) return callbackWaterfall(err)
|
if (err) return callback(err)
|
||||||
|
|
||||||
// This is important in case if there is another attempt
|
// This is important in case if there is another attempt
|
||||||
videoFile.filename = video.getVideoFilename()
|
videoFile.filename = video.getVideoFilename()
|
||||||
return callbackWaterfall(null, t, author, tagInstances, video)
|
return callback(null, t, author, tagInstances, video)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
function insertVideoIntoDB (t, author, tagInstances, video, callbackWaterfall) {
|
function insertVideoIntoDB (t, author, tagInstances, video, callback) {
|
||||||
const options = { transaction: t }
|
const options = { transaction: t }
|
||||||
|
|
||||||
// Add tags association
|
// Add tags association
|
||||||
video.save(options).asCallback(function (err, videoCreated) {
|
video.save(options).asCallback(function (err, videoCreated) {
|
||||||
if (err) return callbackWaterfall(err)
|
if (err) return callback(err)
|
||||||
|
|
||||||
// Do not forget to add Author informations to the created video
|
// Do not forget to add Author informations to the created video
|
||||||
videoCreated.Author = author
|
videoCreated.Author = author
|
||||||
|
|
||||||
return callbackWaterfall(err, t, tagInstances, videoCreated)
|
return callback(err, t, tagInstances, videoCreated)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
function associateTagsToVideo (t, tagInstances, video, callbackWaterfall) {
|
function associateTagsToVideo (t, tagInstances, video, callback) {
|
||||||
const options = { transaction: t }
|
const options = { transaction: t }
|
||||||
|
|
||||||
video.setTags(tagInstances, options).asCallback(function (err) {
|
video.setTags(tagInstances, options).asCallback(function (err) {
|
||||||
video.Tags = tagInstances
|
video.Tags = tagInstances
|
||||||
|
|
||||||
return callbackWaterfall(err, t, video)
|
return callback(err, t, video)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
function sendToFriends (t, video, callbackWaterfall) {
|
function sendToFriends (t, video, callback) {
|
||||||
video.toAddRemoteJSON(function (err, remoteVideo) {
|
video.toAddRemoteJSON(function (err, remoteVideo) {
|
||||||
if (err) return callbackWaterfall(err)
|
if (err) return callback(err)
|
||||||
|
|
||||||
// Now we'll add the video's meta data to our friends
|
// Now we'll add the video's meta data to our friends
|
||||||
friends.addVideoToFriends(remoteVideo, t, function (err) {
|
friends.addVideoToFriends(remoteVideo, t, function (err) {
|
||||||
return callbackWaterfall(err, t)
|
return callback(err, t)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
|
||||||
|
databaseUtils.commitTransaction
|
||||||
|
|
||||||
], function andFinally (err, t) {
|
], function andFinally (err, t) {
|
||||||
if (err) {
|
if (err) {
|
||||||
// This is just a debug because we will retry the insert
|
// This is just a debug because we will retry the insert
|
||||||
logger.debug('Cannot insert the video.', { error: err })
|
logger.debug('Cannot insert the video.', { error: err })
|
||||||
|
return databaseUtils.rollbackTransaction(err, t, finalCallback)
|
||||||
// Abort transaction?
|
|
||||||
if (t) t.rollback()
|
|
||||||
|
|
||||||
return callback(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit transaction
|
|
||||||
t.commit().asCallback(function (err) {
|
|
||||||
if (err) return callback(err)
|
|
||||||
|
|
||||||
logger.info('Video with name %s created.', videoInfos.name)
|
logger.info('Video with name %s created.', videoInfos.name)
|
||||||
return callback(null)
|
return finalCallback(null)
|
||||||
})
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -301,15 +294,14 @@ function updateVideo (req, res, finalCallback) {
|
||||||
friends.updateVideoToFriends(json, t, function (err) {
|
friends.updateVideoToFriends(json, t, function (err) {
|
||||||
return callback(err, t)
|
return callback(err, t)
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
|
||||||
|
databaseUtils.commitTransaction
|
||||||
|
|
||||||
], function andFinally (err, t) {
|
], function andFinally (err, t) {
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.debug('Cannot update the video.', { error: err })
|
logger.debug('Cannot update the video.', { error: err })
|
||||||
|
|
||||||
// Abort transaction?
|
|
||||||
if (t) t.rollback()
|
|
||||||
|
|
||||||
// Force fields we want to update
|
// Force fields we want to update
|
||||||
// If the transaction is retried, sequelize will think the object has not changed
|
// If the transaction is retried, sequelize will think the object has not changed
|
||||||
// So it will skip the SQL request, even if the last one was ROLLBACKed!
|
// So it will skip the SQL request, even if the last one was ROLLBACKed!
|
||||||
|
@ -318,17 +310,12 @@ function updateVideo (req, res, finalCallback) {
|
||||||
videoInstance.set(key, value)
|
videoInstance.set(key, value)
|
||||||
})
|
})
|
||||||
|
|
||||||
return finalCallback(err)
|
return databaseUtils.rollbackTransaction(err, t, finalCallback)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit transaction
|
|
||||||
t.commit().asCallback(function (err) {
|
|
||||||
if (err) return finalCallback(err)
|
|
||||||
|
|
||||||
logger.info('Video with name %s updated.', videoInfosToUpdate.name)
|
logger.info('Video with name %s updated.', videoInfosToUpdate.name)
|
||||||
return finalCallback(null)
|
return finalCallback(null)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function getVideo (req, res, next) {
|
function getVideo (req, res, next) {
|
||||||
|
@ -427,25 +414,18 @@ function reportVideoAbuse (req, res, finalCallback) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return callback(null, t)
|
return callback(null, t)
|
||||||
}
|
},
|
||||||
|
|
||||||
|
databaseUtils.commitTransaction
|
||||||
|
|
||||||
], function andFinally (err, t) {
|
], function andFinally (err, t) {
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.debug('Cannot update the video.', { error: err })
|
logger.debug('Cannot update the video.', { error: err })
|
||||||
|
return databaseUtils.rollbackTransaction(err, t, finalCallback)
|
||||||
// Abort transaction?
|
|
||||||
if (t) t.rollback()
|
|
||||||
|
|
||||||
return finalCallback(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit transaction
|
|
||||||
t.commit().asCallback(function (err) {
|
|
||||||
if (err) return finalCallback(err)
|
|
||||||
|
|
||||||
logger.info('Abuse report for video %s created.', videoInstance.name)
|
logger.info('Abuse report for video %s created.', videoInstance.name)
|
||||||
return finalCallback(null)
|
return finalCallback(null)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,9 +6,27 @@ const db = require('../initializers/database')
|
||||||
const logger = require('./logger')
|
const logger = require('./logger')
|
||||||
|
|
||||||
const utils = {
|
const utils = {
|
||||||
|
commitTransaction,
|
||||||
retryTransactionWrapper,
|
retryTransactionWrapper,
|
||||||
transactionRetryer,
|
rollbackTransaction,
|
||||||
startSerializableTransaction
|
startSerializableTransaction,
|
||||||
|
transactionRetryer
|
||||||
|
}
|
||||||
|
|
||||||
|
function commitTransaction (t, callback) {
|
||||||
|
return t.commit().asCallback(callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
function rollbackTransaction (err, t, callback) {
|
||||||
|
// Try to rollback transaction
|
||||||
|
if (t) {
|
||||||
|
// Do not catch err, report the original one
|
||||||
|
t.rollback().asCallback(function () {
|
||||||
|
return callback(err)
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
return callback(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// { arguments, errorMessage }
|
// { arguments, errorMessage }
|
||||||
|
|
Loading…
Reference in New Issue