Server: make a basic "quick and dirty update" for videos

This system will be useful to to update some int video attributes
(likes, dislikes, views...)

The classic system is not used because we need some optimization for
scaling
This commit is contained in:
Chocobozzz 2017-02-21 21:35:59 +01:00
parent 0150b17e51
commit 9e167724f7
19 changed files with 797 additions and 217 deletions

View File

@ -31,6 +31,13 @@ router.post('/',
remoteVideos remoteVideos
) )
router.post('/qadu',
signatureValidators.signature,
secureMiddleware.checkSignature,
videosValidators.remoteQaduVideos,
remoteVideosQadu
)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
module.exports = router module.exports = router
@ -62,6 +69,73 @@ function remoteVideos (req, res, next) {
return res.type('json').status(204).end() return res.type('json').status(204).end()
} }
function remoteVideosQadu (req, res, next) {
const requests = req.body.data
const fromPod = res.locals.secure.pod
eachSeries(requests, function (request, callbackEach) {
const videoData = request.data
quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod, callbackEach)
}, function (err) {
if (err) logger.error('Error managing remote videos.', { error: err })
})
return res.type('json').status(204).end()
}
function quickAndDirtyUpdateVideoRetryWrapper (videoData, fromPod, finalCallback) {
const options = {
arguments: [ videoData, fromPod ],
errorMessage: 'Cannot update quick and dirty the remote video with many retries.'
}
databaseUtils.retryTransactionWrapper(quickAndDirtyUpdateVideo, options, finalCallback)
}
function quickAndDirtyUpdateVideo (videoData, fromPod, finalCallback) {
waterfall([
databaseUtils.startSerializableTransaction,
function findVideo (t, callback) {
fetchVideo(fromPod.host, videoData.remoteId, function (err, videoInstance) {
return callback(err, t, videoInstance)
})
},
function updateVideoIntoDB (t, videoInstance, callback) {
const options = { transaction: t }
if (videoData.views) {
videoInstance.set('views', videoData.views)
}
if (videoData.likes) {
videoInstance.set('likes', videoData.likes)
}
if (videoData.dislikes) {
videoInstance.set('dislikes', videoData.dislikes)
}
videoInstance.save(options).asCallback(function (err) {
return callback(err, t)
})
},
databaseUtils.commitTransaction
], function (err, t) {
if (err) {
logger.debug('Cannot quick and dirty update the remote video.', { error: err })
return databaseUtils.rollbackTransaction(err, t, finalCallback)
}
logger.info('Remote video %s quick and dirty updated', videoData.name)
return finalCallback(null)
})
}
// Handle retries on fail // Handle retries on fail
function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) { function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) {
const options = { const options = {

View File

@ -320,6 +320,22 @@ function updateVideo (req, res, finalCallback) {
function getVideo (req, res, next) { function getVideo (req, res, next) {
const videoInstance = res.locals.video const videoInstance = res.locals.video
if (videoInstance.isOwned()) {
// The increment is done directly in the database, not using the instance value
videoInstance.increment('views').asCallback(function (err) {
if (err) {
logger.error('Cannot add view to video %d.', videoInstance.id)
return
}
// FIXME: make a real view system
// For example, only add a view when a user watch a video during 30s etc
friends.quickAndDirtyUpdateVideoToFriends(videoInstance.id, constants.REQUEST_VIDEO_QADU_TYPES.VIEWS)
})
}
// Do not wait the view system
res.json(videoInstance.toFormatedJSON()) res.json(videoInstance.toFormatedJSON())
} }

View File

@ -1,5 +1,7 @@
'use strict' 'use strict'
const has = require('lodash/has')
const constants = require('../../../initializers/constants') const constants = require('../../../initializers/constants')
const videosValidators = require('../videos') const videosValidators = require('../videos')
const miscValidators = require('../misc') const miscValidators = require('../misc')
@ -7,7 +9,8 @@ const miscValidators = require('../misc')
const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
const remoteVideosValidators = { const remoteVideosValidators = {
isEachRemoteRequestVideosValid isEachRemoteRequestVideosValid,
isEachRemoteRequestVideosQaduValid
} }
function isEachRemoteRequestVideosValid (requests) { function isEachRemoteRequestVideosValid (requests) {
@ -16,13 +19,13 @@ function isEachRemoteRequestVideosValid (requests) {
const video = request.data const video = request.data
return ( return (
isRequestTypeAddValid(request.type) && isRequestTypeAddValid(request.type) &&
isCommonVideoAttrbiutesValid(video) && isCommonVideoAttributesValid(video) &&
videosValidators.isVideoAuthorValid(video.author) && videosValidators.isVideoAuthorValid(video.author) &&
videosValidators.isVideoThumbnailDataValid(video.thumbnailData) videosValidators.isVideoThumbnailDataValid(video.thumbnailData)
) || ) ||
( (
isRequestTypeUpdateValid(request.type) && isRequestTypeUpdateValid(request.type) &&
isCommonVideoAttrbiutesValid(video) isCommonVideoAttributesValid(video)
) || ) ||
( (
isRequestTypeRemoveValid(request.type) && isRequestTypeRemoveValid(request.type) &&
@ -37,13 +40,27 @@ function isEachRemoteRequestVideosValid (requests) {
}) })
} }
function isEachRemoteRequestVideosQaduValid (requests) {
return miscValidators.isArray(requests) &&
requests.every(function (request) {
const video = request.data
return (
videosValidators.isVideoRemoteIdValid(video.remoteId) &&
(has(video, 'views') === false || videosValidators.isVideoViewsValid) &&
(has(video, 'likes') === false || videosValidators.isVideoLikesValid) &&
(has(video, 'dislikes') === false || videosValidators.isVideoDislikesValid)
)
})
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
module.exports = remoteVideosValidators module.exports = remoteVideosValidators
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function isCommonVideoAttrbiutesValid (video) { function isCommonVideoAttributesValid (video) {
return videosValidators.isVideoDateValid(video.createdAt) && return videosValidators.isVideoDateValid(video.createdAt) &&
videosValidators.isVideoDateValid(video.updatedAt) && videosValidators.isVideoDateValid(video.updatedAt) &&
videosValidators.isVideoDescriptionValid(video.description) && videosValidators.isVideoDescriptionValid(video.description) &&

View File

@ -22,7 +22,10 @@ const videosValidators = {
isVideoRemoteIdValid, isVideoRemoteIdValid,
isVideoAbuseReasonValid, isVideoAbuseReasonValid,
isVideoAbuseReporterUsernameValid, isVideoAbuseReporterUsernameValid,
isVideoFile isVideoFile,
isVideoViewsValid,
isVideoLikesValid,
isVideoDislikesValid
} }
function isVideoAuthorValid (value) { function isVideoAuthorValid (value) {
@ -82,6 +85,18 @@ function isVideoAbuseReporterUsernameValid (value) {
return usersValidators.isUserUsernameValid(value) return usersValidators.isUserUsernameValid(value)
} }
function isVideoViewsValid (value) {
return validator.isInt(value, { min: 0 })
}
function isVideoLikesValid (value) {
return validator.isInt(value, { min: 0 })
}
function isVideoDislikesValid (value) {
return validator.isInt(value, { min: 0 })
}
function isVideoFile (value, files) { function isVideoFile (value, files) {
// Should have files // Should have files
if (!files) return false if (!files) return false

View File

@ -58,6 +58,8 @@ function makeSecureRequest (params, callback) {
requestParams.json.data = params.data requestParams.json.data = params.data
} }
console.log(requestParams.json.data)
request.post(requestParams, callback) request.post(requestParams, callback)
} }

View File

@ -5,7 +5,7 @@ const path = require('path')
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const LAST_MIGRATION_VERSION = 10 const LAST_MIGRATION_VERSION = 15
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -24,7 +24,7 @@ const SEARCHABLE_COLUMNS = {
const SORTABLE_COLUMNS = { const SORTABLE_COLUMNS = {
USERS: [ 'id', '-id', 'username', '-username', 'createdAt', '-createdAt' ], USERS: [ 'id', '-id', 'username', '-username', 'createdAt', '-createdAt' ],
VIDEO_ABUSES: [ 'id', '-id', 'createdAt', '-createdAt' ], VIDEO_ABUSES: [ 'id', '-id', 'createdAt', '-createdAt' ],
VIDEOS: [ 'name', '-name', 'duration', '-duration', 'createdAt', '-createdAt' ] VIDEOS: [ 'name', '-name', 'duration', '-duration', 'createdAt', '-createdAt', 'views', '-views' ]
} }
const OAUTH_LIFETIME = { const OAUTH_LIFETIME = {
@ -116,11 +116,16 @@ const REQUESTS_LIMIT_PODS = 10
// How many requests we send to a pod per interval // How many requests we send to a pod per interval
const REQUESTS_LIMIT_PER_POD = 5 const REQUESTS_LIMIT_PER_POD = 5
const REQUESTS_VIDEO_QADU_LIMIT_PODS = 10
// The QADU requests are not big
const REQUESTS_VIDEO_QADU_LIMIT_PER_POD = 50
// Number of requests to retry for replay requests module // Number of requests to retry for replay requests module
const RETRY_REQUESTS = 5 const RETRY_REQUESTS = 5
const REQUEST_ENDPOINTS = { const REQUEST_ENDPOINTS = {
VIDEOS: 'videos' VIDEOS: 'videos',
QADU: 'videos/qadu'
} }
const REQUEST_ENDPOINT_ACTIONS = {} const REQUEST_ENDPOINT_ACTIONS = {}
REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] = { REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] = {
@ -130,6 +135,12 @@ REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] = {
REPORT_ABUSE: 'report-abuse' REPORT_ABUSE: 'report-abuse'
} }
const REQUEST_VIDEO_QADU_TYPES = {
LIKES: 'likes',
DISLIKES: 'dislikes',
VIEWS: 'views'
}
const REMOTE_SCHEME = { const REMOTE_SCHEME = {
HTTP: 'https', HTTP: 'https',
WS: 'wss' WS: 'wss'
@ -199,10 +210,13 @@ module.exports = {
REMOTE_SCHEME, REMOTE_SCHEME,
REQUEST_ENDPOINT_ACTIONS, REQUEST_ENDPOINT_ACTIONS,
REQUEST_ENDPOINTS, REQUEST_ENDPOINTS,
REQUEST_VIDEO_QADU_TYPES,
REQUESTS_IN_PARALLEL, REQUESTS_IN_PARALLEL,
REQUESTS_INTERVAL, REQUESTS_INTERVAL,
REQUESTS_LIMIT_PER_POD, REQUESTS_LIMIT_PER_POD,
REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PODS,
REQUESTS_VIDEO_QADU_LIMIT_PER_POD,
REQUESTS_VIDEO_QADU_LIMIT_PODS,
RETRY_REQUESTS, RETRY_REQUESTS,
SEARCHABLE_COLUMNS, SEARCHABLE_COLUMNS,
SIGNATURE_ALGORITHM, SIGNATURE_ALGORITHM,

View File

@ -0,0 +1,19 @@
'use strict'
// utils = { transaction, queryInterface, sequelize, Sequelize }
exports.up = function (utils, finalCallback) {
const q = utils.queryInterface
const Sequelize = utils.Sequelize
const data = {
type: Sequelize.INTEGER,
allowNull: false,
defaultValue: 0
}
q.addColumn('Videos', 'views', data, { transaction: utils.transaction }).asCallback(finalCallback)
}
exports.down = function (options, callback) {
throw new Error('Not implemented.')
}

View File

@ -0,0 +1,140 @@
'use strict'
const eachLimit = require('async/eachLimit')
const constants = require('../initializers/constants')
const db = require('../initializers/database')
const logger = require('../helpers/logger')
const requests = require('../helpers/requests')
module.exports = class BaseRequestScheduler {
constructor (options) {
this.lastRequestTimestamp = 0
this.timer = null
}
activate () {
logger.info('Requests scheduler activated.')
this.lastRequestTimestamp = Date.now()
this.timer = setInterval(() => {
this.lastRequestTimestamp = Date.now()
this.makeRequests()
}, constants.REQUESTS_INTERVAL)
}
deactivate () {
logger.info('Requests scheduler deactivated.')
clearInterval(this.timer)
this.timer = null
}
forceSend () {
logger.info('Force requests scheduler sending.')
this.makeRequests()
}
remainingMilliSeconds () {
if (this.timer === null) return -1
return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
}
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
if (!callback) callback = function () {}
const params = {
toPod: toPod,
sign: true, // Prove our identity
method: 'POST',
path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
data: requestsToMake // Requests we need to make
}
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
requests.makeSecureRequest(params, (err, res) => {
if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
err = err ? err.message : 'Status code not 20x : ' + res.statusCode
logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
return callback(false)
}
return callback(true)
})
}
// Make all the requests of the scheduler
makeRequests () {
this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
if (err) {
logger.error('Cannot get the list of "%s".', this.description, { err: err })
return // Abort
}
// If there are no requests, abort
if (requests.length === 0) {
logger.info('No "%s" to make.', this.description)
return
}
// We want to group requests by destinations pod and endpoint
const requestsToMakeGrouped = this.buildRequestObjects(requests)
logger.info('Making "%s" to friends.', this.description)
const goodPods = []
const badPods = []
eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
const requestToMake = requestsToMakeGrouped[hashKey]
const toPod = requestToMake.toPod
// Maybe the pod is not our friend anymore so simply remove it
if (!toPod) {
const requestIdsToDelete = requestToMake.ids
logger.info('Removing %d "%s" of unexisting pod %s.', requestIdsToDelete.length, this.description, requestToMake.toPod.id)
return this.getRequestToPodModel().removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
}
this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
if (success === false) {
badPods.push(requestToMake.toPod.id)
return callbackEach()
}
logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
goodPods.push(requestToMake.toPod.id)
// Remove the pod id of these request ids
this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
this.afterRequestHook()
})
}, () => {
// All the requests were made, we update the pods score
db.Pod.updatePodsScore(goodPods, badPods)
this.afterRequestsHook()
})
})
}
flush (callback) {
this.getRequestModel().removeAll(callback)
}
afterRequestHook () {
// Nothing to do, let children reimplement it
}
afterRequestsHook () {
// Nothing to do, let children reimplement it
}
}

View File

@ -12,15 +12,19 @@ const logger = require('../helpers/logger')
const peertubeCrypto = require('../helpers/peertube-crypto') const peertubeCrypto = require('../helpers/peertube-crypto')
const requests = require('../helpers/requests') const requests = require('../helpers/requests')
const RequestScheduler = require('./request-scheduler') const RequestScheduler = require('./request-scheduler')
const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
const requestScheduler = new RequestScheduler() const requestScheduler = new RequestScheduler()
const requestSchedulerVideoQadu = new RequestVideoQaduScheduler()
const friends = { const friends = {
activate, activate,
addVideoToFriends, addVideoToFriends,
updateVideoToFriends, updateVideoToFriends,
reportAbuseVideoToFriend, reportAbuseVideoToFriend,
quickAndDirtyUpdateVideoToFriends,
hasFriends, hasFriends,
makeFriends, makeFriends,
quitFriends, quitFriends,
@ -30,6 +34,7 @@ const friends = {
function activate () { function activate () {
requestScheduler.activate() requestScheduler.activate()
requestSchedulerVideoQadu.activate()
} }
function addVideoToFriends (videoData, transaction, callback) { function addVideoToFriends (videoData, transaction, callback) {
@ -71,6 +76,15 @@ function reportAbuseVideoToFriend (reportData, video) {
createRequest(options) createRequest(options)
} }
function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
const options = {
videoId,
type,
transaction
}
return createVideoQaduRequest(options, callback)
}
function hasFriends (callback) { function hasFriends (callback) {
db.Pod.countAll(function (err, count) { db.Pod.countAll(function (err, count) {
if (err) return callback(err) if (err) return callback(err)
@ -110,7 +124,11 @@ function quitFriends (callback) {
waterfall([ waterfall([
function flushRequests (callbackAsync) { function flushRequests (callbackAsync) {
requestScheduler.flush(callbackAsync) requestScheduler.flush(err => callbackAsync(err))
},
function flushVideoQaduRequests (callbackAsync) {
requestSchedulerVideoQadu.flush(err => callbackAsync(err))
}, },
function getPodsList (callbackAsync) { function getPodsList (callbackAsync) {
@ -310,6 +328,12 @@ function createRequest (options, callback) {
}) })
} }
function createVideoQaduRequest (options, callback) {
if (!callback) callback = function () {}
requestSchedulerVideoQadu.createRequest(options, callback)
}
function isMe (host) { function isMe (host) {
return host === constants.CONFIG.WEBSERVER.HOST return host === constants.CONFIG.WEBSERVER.HOST
} }

View File

@ -1,44 +1,54 @@
'use strict' 'use strict'
const eachLimit = require('async/eachLimit')
const constants = require('../initializers/constants') const constants = require('../initializers/constants')
const BaseRequestScheduler = require('./base-request-scheduler')
const db = require('../initializers/database') const db = require('../initializers/database')
const logger = require('../helpers/logger') const logger = require('../helpers/logger')
const requests = require('../helpers/requests')
module.exports = class RequestScheduler { module.exports = class RequestScheduler extends BaseRequestScheduler {
constructor () { constructor () {
this.lastRequestTimestamp = 0 super()
this.timer = null
// We limit the size of the requests
this.limitPods = constants.REQUESTS_LIMIT_PODS
this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD
this.description = 'requests'
} }
activate () { getRequestModel () {
logger.info('Requests scheduler activated.') return db.Request
this.lastRequestTimestamp = Date.now()
this.timer = setInterval(() => {
this.lastRequestTimestamp = Date.now()
this.makeRequests()
}, constants.REQUESTS_INTERVAL)
} }
deactivate () { getRequestToPodModel () {
logger.info('Requests scheduler deactivated.') return db.RequestToPod
clearInterval(this.timer)
this.timer = null
} }
forceSend () { buildRequestObjects (requests) {
logger.info('Force requests scheduler sending.') const requestsToMakeGrouped = {}
this.makeRequests()
Object.keys(requests).forEach(toPodId => {
requests[toPodId].forEach(data => {
const request = data.request
const pod = data.pod
const hashKey = toPodId + request.endpoint
if (!requestsToMakeGrouped[hashKey]) {
requestsToMakeGrouped[hashKey] = {
toPod: pod,
endpoint: request.endpoint,
ids: [], // request ids, to delete them from the DB in the future
datas: [] // requests data,
}
} }
remainingMilliSeconds () { requestsToMakeGrouped[hashKey].ids.push(request.id)
if (this.timer === null) return -1 requestsToMakeGrouped[hashKey].datas.push(request.request)
})
})
return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) return requestsToMakeGrouped
} }
// { type, endpoint, data, toIds, transaction } // { type, endpoint, data, toIds, transaction }
@ -79,122 +89,10 @@ module.exports = class RequestScheduler {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Make all the requests of the scheduler afterRequestsHook () {
makeRequests () {
// We limit the size of the requests
// We don't want to stuck with the same failing requests so we get a random list
db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
if (err) {
logger.error('Cannot get the list of requests.', { err: err })
return // Abort
}
// If there are no requests, abort
if (requests.length === 0) {
logger.info('No requests to make.')
return
}
// We want to group requests by destinations pod and endpoint
const requestsToMakeGrouped = this.buildRequestObjects(requests)
logger.info('Making requests to friends.')
const goodPods = []
const badPods = []
eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
const requestToMake = requestsToMakeGrouped[hashKey]
const toPod = requestToMake.toPod
// Maybe the pod is not our friend anymore so simply remove it
if (!toPod) {
const requestIdsToDelete = requestToMake.ids
logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
}
this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
if (success === false) {
badPods.push(requestToMake.toPod.id)
return callbackEach()
}
logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
goodPods.push(requestToMake.toPod.id)
// Remove the pod id of these request ids
db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
})
}, () => {
// All the requests were made, we update the pods score
db.Request.updatePodsScore(goodPods, badPods)
// Flush requests with no pod // Flush requests with no pod
db.Request.removeWithEmptyTo(err => { this.getRequestModel().removeWithEmptyTo(err => {
if (err) logger.error('Error when removing requests with no pods.', { error: err }) if (err) logger.error('Error when removing requests with no pods.', { error: err })
}) })
})
})
}
// Make a requests to friends of a certain type
makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
if (!callback) callback = function () {}
const params = {
toPod: toPod,
sign: true, // Prove our identity
method: 'POST',
path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
data: requestsToMake // Requests we need to make
}
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
requests.makeSecureRequest(params, (err, res) => {
if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
err = err ? err.message : 'Status code not 20x : ' + res.statusCode
logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
return callback(false)
}
return callback(true)
})
}
buildRequestObjects (requests) {
const requestsToMakeGrouped = {}
Object.keys(requests).forEach(toPodId => {
requests[toPodId].forEach(data => {
const request = data.request
const pod = data.pod
const hashKey = toPodId + request.endpoint
if (!requestsToMakeGrouped[hashKey]) {
requestsToMakeGrouped[hashKey] = {
toPod: pod,
endpoint: request.endpoint,
ids: [], // request ids, to delete them from the DB in the future
datas: [] // requests data,
}
}
requestsToMakeGrouped[hashKey].ids.push(request.id)
requestsToMakeGrouped[hashKey].datas.push(request.request)
})
})
return requestsToMakeGrouped
}
flush (callback) {
db.Request.removeAll(err => {
if (err) logger.error('Cannot flush the requests.', { error: err })
return callback(err)
})
} }
} }

View File

@ -0,0 +1,116 @@
'use strict'
const BaseRequestScheduler = require('./base-request-scheduler')
const constants = require('../initializers/constants')
const db = require('../initializers/database')
const logger = require('../helpers/logger')
module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
constructor () {
super()
// We limit the size of the requests
this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
this.description = 'video QADU requests'
}
getRequestModel () {
return db.RequestVideoQadu
}
getRequestToPodModel () {
return db.RequestVideoQadu
}
buildRequestObjects (requests) {
const requestsToMakeGrouped = {}
Object.keys(requests).forEach(toPodId => {
requests[toPodId].forEach(data => {
const request = data.request
const video = data.video
const pod = data.pod
const hashKey = toPodId
if (!requestsToMakeGrouped[hashKey]) {
requestsToMakeGrouped[hashKey] = {
toPod: pod,
endpoint: constants.REQUEST_ENDPOINTS.QADU,
ids: [], // request ids, to delete them from the DB in the future
datas: [], // requests data
videos: {}
}
}
if (!requestsToMakeGrouped[hashKey].videos[video.id]) {
requestsToMakeGrouped[hashKey].videos[video.id] = {}
}
const videoData = requestsToMakeGrouped[hashKey].videos[video.id]
switch (request.type) {
case constants.REQUEST_VIDEO_QADU_TYPES.LIKES:
videoData.likes = video.likes
break
case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES:
videoData.likes = video.dislikes
break
case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS:
videoData.views = video.views
break
default:
logger.error('Unknown request video QADU type %s.', request.type)
return
}
// Do not forget the remoteId so the remote pod can identify the video
videoData.remoteId = video.id
requestsToMakeGrouped[hashKey].ids.push(request.id)
requestsToMakeGrouped[hashKey].videos[video.id] = videoData
})
})
Object.keys(requestsToMakeGrouped).forEach(hashKey => {
Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => {
const videoData = requestsToMakeGrouped[hashKey].videos[videoId]
requestsToMakeGrouped[hashKey].datas.push({
data: videoData
})
})
// We don't need it anymore, it was just to build our datas array
delete requestsToMakeGrouped[hashKey].videos
})
return requestsToMakeGrouped
}
// { type, videoId, transaction? }
createRequest (options, callback) {
const type = options.type
const videoId = options.videoId
const transaction = options.transaction
const dbRequestOptions = {}
if (transaction) dbRequestOptions.transaction = transaction
// Send the update to all our friends
db.Pod.listAllIds(options.transaction, function (err, podIds) {
if (err) return callback(err)
const queries = []
podIds.forEach(podId => {
queries.push({ type, videoId, podId })
})
return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback)
})
}
}

View File

@ -4,7 +4,8 @@ const checkErrors = require('../utils').checkErrors
const logger = require('../../../helpers/logger') const logger = require('../../../helpers/logger')
const validatorsRemoteVideos = { const validatorsRemoteVideos = {
remoteVideos remoteVideos,
remoteQaduVideos
} }
function remoteVideos (req, res, next) { function remoteVideos (req, res, next) {
@ -15,6 +16,14 @@ function remoteVideos (req, res, next) {
checkErrors(req, res, next) checkErrors(req, res, next)
} }
function remoteQaduVideos (req, res, next) {
req.checkBody('data').isEachRemoteRequestVideosQaduValid()
logger.debug('Checking remoteVideosQadu parameters', { parameters: req.body })
checkErrors(req, res, next)
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
module.exports = validatorsRemoteVideos module.exports = validatorsRemoteVideos

View File

@ -1,8 +1,11 @@
'use strict' 'use strict'
const each = require('async/each')
const map = require('lodash/map') const map = require('lodash/map')
const waterfall = require('async/waterfall')
const constants = require('../initializers/constants') const constants = require('../initializers/constants')
const logger = require('../helpers/logger')
const customPodsValidators = require('../helpers/custom-validators').pods const customPodsValidators = require('../helpers/custom-validators').pods
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -62,6 +65,7 @@ module.exports = function (sequelize, DataTypes) {
listBadPods, listBadPods,
load, load,
loadByHost, loadByHost,
updatePodsScore,
removeAll removeAll
}, },
instanceMethods: { instanceMethods: {
@ -144,7 +148,7 @@ function listAllIds (transaction, callback) {
}) })
} }
function listRandomPodIdsWithRequest (limit, callback) { function listRandomPodIdsWithRequest (limit, tableRequestPod, callback) {
const self = this const self = this
self.count().asCallback(function (err, count) { self.count().asCallback(function (err, count) {
@ -166,7 +170,7 @@ function listRandomPodIdsWithRequest (limit, callback) {
where: { where: {
id: { id: {
$in: [ $in: [
this.sequelize.literal('SELECT "podId" FROM "RequestToPods"') this.sequelize.literal('SELECT "podId" FROM "' + tableRequestPod + '"')
] ]
} }
} }
@ -207,3 +211,58 @@ function loadByHost (host, callback) {
function removeAll (callback) { function removeAll (callback) {
return this.destroy().asCallback(callback) return this.destroy().asCallback(callback)
} }
function updatePodsScore (goodPods, badPods) {
const self = this
logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
if (goodPods.length !== 0) {
this.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
if (err) logger.error('Cannot increment scores of good pods.', { error: err })
})
}
if (badPods.length !== 0) {
this.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
removeBadPods.call(self)
})
}
}
// ---------------------------------------------------------------------------
// Remove pods with a score of 0 (too many requests where they were unreachable)
function removeBadPods () {
const self = this
waterfall([
function findBadPods (callback) {
self.sequelize.models.Pod.listBadPods(function (err, pods) {
if (err) {
logger.error('Cannot find bad pods.', { error: err })
return callback(err)
}
return callback(null, pods)
})
},
function removeTheseBadPods (pods, callback) {
each(pods, function (pod, callbackEach) {
pod.destroy().asCallback(callbackEach)
}, function (err) {
return callback(err, pods.length)
})
}
], function (err, numberOfPodsRemoved) {
if (err) {
logger.error('Cannot remove bad pods.', { error: err })
} else if (numberOfPodsRemoved) {
logger.info('Removed %d pods.', numberOfPodsRemoved)
} else {
logger.info('No need to remove bad pods.')
}
})
}

View File

@ -17,7 +17,7 @@ module.exports = function (sequelize, DataTypes) {
} }
], ],
classMethods: { classMethods: {
removePodOf removeByRequestIdsAndPod
} }
}) })
@ -26,7 +26,7 @@ module.exports = function (sequelize, DataTypes) {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function removePodOf (requestsIds, podId, callback) { function removeByRequestIdsAndPod (requestsIds, podId, callback) {
if (!callback) callback = function () {} if (!callback) callback = function () {}
const query = { const query = {

View File

@ -0,0 +1,154 @@
'use strict'
/*
Request Video for Quick And Dirty Updates like:
- views
- likes
- dislikes
We can't put it in the same system than basic requests for efficiency.
Moreover we don't want to slow down the basic requests with a lot of views/likes/dislikes requests.
So we put it an independant request scheduler.
*/
const values = require('lodash/values')
const constants = require('../initializers/constants')
// ---------------------------------------------------------------------------
module.exports = function (sequelize, DataTypes) {
const RequestVideoQadu = sequelize.define('RequestVideoQadu',
{
type: {
type: DataTypes.ENUM(values(constants.REQUEST_VIDEO_QADU_TYPES)),
allowNull: false
}
},
{
timestamps: false,
indexes: [
{
fields: [ 'podId' ]
},
{
fields: [ 'videoId' ]
}
],
classMethods: {
associate,
listWithLimitAndRandom,
countTotalRequests,
removeAll,
removeByRequestIdsAndPod
}
}
)
return RequestVideoQadu
}
// ------------------------------ STATICS ------------------------------
function associate (models) {
this.belongsTo(models.Pod, {
foreignKey: {
name: 'podId',
allowNull: false
},
onDelete: 'CASCADE'
})
this.belongsTo(models.Video, {
foreignKey: {
name: 'videoId',
allowNull: false
},
onDelete: 'CASCADE'
})
}
function countTotalRequests (callback) {
const query = {
include: [ this.sequelize.models.Pod ]
}
return this.count(query).asCallback(callback)
}
function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
const self = this
const Pod = this.sequelize.models.Pod
Pod.listRandomPodIdsWithRequest(limitPods, 'RequestVideoQadus', function (err, podIds) {
if (err) return callback(err)
// We don't have friends that have requests
if (podIds.length === 0) return callback(null, [])
const query = {
include: [
{
model: self.sequelize.models.Pod,
where: {
id: {
$in: podIds
}
}
},
{
model: self.sequelize.models.Video
}
]
}
self.findAll(query).asCallback(function (err, requests) {
if (err) return callback(err)
const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod)
return callback(err, requestsGrouped)
})
})
}
function removeByRequestIdsAndPod (ids, podId, callback) {
const query = {
where: {
id: {
$in: ids
},
podId
}
}
this.destroy(query).asCallback(callback)
}
function removeAll (callback) {
// Delete all requests
this.truncate({ cascade: true }).asCallback(callback)
}
// ---------------------------------------------------------------------------
function groupAndTruncateRequests (requests, limitRequestsPerPod) {
const requestsGrouped = {}
requests.forEach(function (request) {
const pod = request.Pod
if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
requestsGrouped[pod.id].push({
request: request,
video: request.Video,
pod
})
}
})
return requestsGrouped
}

View File

@ -1,11 +1,8 @@
'use strict' 'use strict'
const each = require('async/each')
const waterfall = require('async/waterfall')
const values = require('lodash/values') const values = require('lodash/values')
const constants = require('../initializers/constants') const constants = require('../initializers/constants')
const logger = require('../helpers/logger')
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -28,8 +25,6 @@ module.exports = function (sequelize, DataTypes) {
listWithLimitAndRandom, listWithLimitAndRandom,
countTotalRequests, countTotalRequests,
removeBadPods,
updatePodsScore,
removeAll, removeAll,
removeWithEmptyTo removeWithEmptyTo
} }
@ -60,71 +55,17 @@ function countTotalRequests (callback) {
return this.count(query).asCallback(callback) return this.count(query).asCallback(callback)
} }
// Remove pods with a score of 0 (too many requests where they were unreachable)
function removeBadPods () {
const self = this
waterfall([
function findBadPods (callback) {
self.sequelize.models.Pod.listBadPods(function (err, pods) {
if (err) {
logger.error('Cannot find bad pods.', { error: err })
return callback(err)
}
return callback(null, pods)
})
},
function removeTheseBadPods (pods, callback) {
each(pods, function (pod, callbackEach) {
pod.destroy().asCallback(callbackEach)
}, function (err) {
return callback(err, pods.length)
})
}
], function (err, numberOfPodsRemoved) {
if (err) {
logger.error('Cannot remove bad pods.', { error: err })
} else if (numberOfPodsRemoved) {
logger.info('Removed %d pods.', numberOfPodsRemoved)
} else {
logger.info('No need to remove bad pods.')
}
})
}
function updatePodsScore (goodPods, badPods) {
const self = this
const Pod = this.sequelize.models.Pod
logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
if (goodPods.length !== 0) {
Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
if (err) logger.error('Cannot increment scores of good pods.', { error: err })
})
}
if (badPods.length !== 0) {
Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
removeBadPods.call(self)
})
}
}
function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
const self = this const self = this
const Pod = this.sequelize.models.Pod const Pod = this.sequelize.models.Pod
Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) { Pod.listRandomPodIdsWithRequest(limitPods, 'RequestToPods', function (err, podIds) {
if (err) return callback(err) if (err) return callback(err)
// We don't have friends that have requests // We don't have friends that have requests
if (podIds.length === 0) return callback(null, []) if (podIds.length === 0) return callback(null, [])
// The the first x requests of these pods // The first x requests of these pods
// It is very important to sort by id ASC to keep the requests order! // It is very important to sort by id ASC to keep the requests order!
const query = { const query = {
order: [ order: [

View File

@ -80,6 +80,15 @@ module.exports = function (sequelize, DataTypes) {
if (res === false) throw new Error('Video duration is not valid.') if (res === false) throw new Error('Video duration is not valid.')
} }
} }
},
views: {
type: DataTypes.INTEGER,
allowNull: false,
defaultValue: 0,
validate: {
min: 0,
isInt: true
}
} }
}, },
{ {
@ -101,6 +110,9 @@ module.exports = function (sequelize, DataTypes) {
}, },
{ {
fields: [ 'infoHash' ] fields: [ 'infoHash' ]
},
{
fields: [ 'views' ]
} }
], ],
classMethods: { classMethods: {
@ -336,6 +348,7 @@ function toFormatedJSON () {
magnetUri: this.generateMagnetUri(), magnetUri: this.generateMagnetUri(),
author: this.Author.name, author: this.Author.name,
duration: this.duration, duration: this.duration,
views: this.views,
tags: map(this.Tags, 'name'), tags: map(this.Tags, 'name'),
thumbnailPath: pathUtils.join(constants.STATIC_PATHS.THUMBNAILS, this.getThumbnailName()), thumbnailPath: pathUtils.join(constants.STATIC_PATHS.THUMBNAILS, this.getThumbnailName()),
createdAt: this.createdAt, createdAt: this.createdAt,

View File

@ -3,6 +3,7 @@
const chai = require('chai') const chai = require('chai')
const each = require('async/each') const each = require('async/each')
const expect = chai.expect const expect = chai.expect
const parallel = require('async/parallel')
const series = require('async/series') const series = require('async/series')
const WebTorrent = require('webtorrent') const WebTorrent = require('webtorrent')
const webtorrent = new WebTorrent() const webtorrent = new WebTorrent()
@ -375,6 +376,63 @@ describe('Test multiple pods', function () {
}) })
}) })
describe('Should update video views', function () {
let videoId1
let videoId2
before(function (done) {
videosUtils.getVideosList(servers[2].url, function (err, res) {
if (err) throw err
const videos = res.body.data.filter(video => video.isLocal === true)
videoId1 = videos[0].id
videoId2 = videos[1].id
done()
})
})
it('Should views multiple videos on owned servers', function (done) {
this.timeout(30000)
parallel([
function (callback) {
videosUtils.getVideo(servers[2].url, videoId1, callback)
},
function (callback) {
videosUtils.getVideo(servers[2].url, videoId1, callback)
},
function (callback) {
videosUtils.getVideo(servers[2].url, videoId1, callback)
},
function (callback) {
videosUtils.getVideo(servers[2].url, videoId2, callback)
}
], function (err) {
if (err) throw err
setTimeout(done, 22000)
})
})
it('Should have views updated on each pod', function (done) {
each(servers, function (server, callback) {
videosUtils.getVideosList(server.url, function (err, res) {
if (err) throw err
const videos = res.body.data
expect(videos.find(video => video.views === 3)).to.be.exist
expect(videos.find(video => video.views === 1)).to.be.exist
callback()
})
}, done)
})
})
/*
describe('Should manipulate these videos', function () { describe('Should manipulate these videos', function () {
it('Should update the video 3 by asking pod 3', function (done) { it('Should update the video 3 by asking pod 3', function (done) {
this.timeout(15000) this.timeout(15000)
@ -462,7 +520,7 @@ describe('Test multiple pods', function () {
}, done) }, done)
}) })
}) })
*/
after(function (done) { after(function (done) {
servers.forEach(function (server) { servers.forEach(function (server) {
process.kill(-server.app.pid) process.kill(-server.app.pid)

View File

@ -129,6 +129,17 @@ describe('Test a single pod', function () {
}) })
}) })
it('Should have the views updated', function (done) {
videosUtils.getVideo(server.url, videoId, function (err, res) {
if (err) throw err
const video = res.body
expect(video.views).to.equal(1)
done()
})
})
it('Should search the video by name by default', function (done) { it('Should search the video by name by default', function (done) {
videosUtils.searchVideo(server.url, 'my', function (err, res) { videosUtils.searchVideo(server.url, 'my', function (err, res) {
if (err) throw err if (err) throw err