diff --git a/server/controllers/api/remote/videos.js b/server/controllers/api/remote/videos.js index f8b4949cd..79b503d4d 100644 --- a/server/controllers/api/remote/videos.js +++ b/server/controllers/api/remote/videos.js @@ -31,6 +31,13 @@ router.post('/', remoteVideos ) +router.post('/qadu', + signatureValidators.signature, + secureMiddleware.checkSignature, + videosValidators.remoteQaduVideos, + remoteVideosQadu +) + // --------------------------------------------------------------------------- module.exports = router @@ -62,6 +69,73 @@ function remoteVideos (req, res, next) { return res.type('json').status(204).end() } +function remoteVideosQadu (req, res, next) { + const requests = req.body.data + const fromPod = res.locals.secure.pod + + eachSeries(requests, function (request, callbackEach) { + const videoData = request.data + + quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod, callbackEach) + }, function (err) { + if (err) logger.error('Error managing remote videos.', { error: err }) + }) + + return res.type('json').status(204).end() +} + +function quickAndDirtyUpdateVideoRetryWrapper (videoData, fromPod, finalCallback) { + const options = { + arguments: [ videoData, fromPod ], + errorMessage: 'Cannot update quick and dirty the remote video with many retries.' + } + + databaseUtils.retryTransactionWrapper(quickAndDirtyUpdateVideo, options, finalCallback) +} + +function quickAndDirtyUpdateVideo (videoData, fromPod, finalCallback) { + waterfall([ + databaseUtils.startSerializableTransaction, + + function findVideo (t, callback) { + fetchVideo(fromPod.host, videoData.remoteId, function (err, videoInstance) { + return callback(err, t, videoInstance) + }) + }, + + function updateVideoIntoDB (t, videoInstance, callback) { + const options = { transaction: t } + + if (videoData.views) { + videoInstance.set('views', videoData.views) + } + + if (videoData.likes) { + videoInstance.set('likes', videoData.likes) + } + + if (videoData.dislikes) { + videoInstance.set('dislikes', videoData.dislikes) + } + + videoInstance.save(options).asCallback(function (err) { + return callback(err, t) + }) + }, + + databaseUtils.commitTransaction + + ], function (err, t) { + if (err) { + logger.debug('Cannot quick and dirty update the remote video.', { error: err }) + return databaseUtils.rollbackTransaction(err, t, finalCallback) + } + + logger.info('Remote video %s quick and dirty updated', videoData.name) + return finalCallback(null) + }) +} + // Handle retries on fail function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) { const options = { diff --git a/server/controllers/api/videos.js b/server/controllers/api/videos.js index c936105e7..9f4bbb7b7 100644 --- a/server/controllers/api/videos.js +++ b/server/controllers/api/videos.js @@ -320,6 +320,22 @@ function updateVideo (req, res, finalCallback) { function getVideo (req, res, next) { const videoInstance = res.locals.video + + if (videoInstance.isOwned()) { + // The increment is done directly in the database, not using the instance value + videoInstance.increment('views').asCallback(function (err) { + if (err) { + logger.error('Cannot add view to video %d.', videoInstance.id) + return + } + + // FIXME: make a real view system + // For example, only add a view when a user watch a video during 30s etc + friends.quickAndDirtyUpdateVideoToFriends(videoInstance.id, constants.REQUEST_VIDEO_QADU_TYPES.VIEWS) + }) + } + + // Do not wait the view system res.json(videoInstance.toFormatedJSON()) } diff --git a/server/helpers/custom-validators/remote/videos.js b/server/helpers/custom-validators/remote/videos.js index ee68ebc10..2e9cf822e 100644 --- a/server/helpers/custom-validators/remote/videos.js +++ b/server/helpers/custom-validators/remote/videos.js @@ -1,5 +1,7 @@ 'use strict' +const has = require('lodash/has') + const constants = require('../../../initializers/constants') const videosValidators = require('../videos') const miscValidators = require('../misc') @@ -7,7 +9,8 @@ const miscValidators = require('../misc') const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] const remoteVideosValidators = { - isEachRemoteRequestVideosValid + isEachRemoteRequestVideosValid, + isEachRemoteRequestVideosQaduValid } function isEachRemoteRequestVideosValid (requests) { @@ -16,13 +19,13 @@ function isEachRemoteRequestVideosValid (requests) { const video = request.data return ( isRequestTypeAddValid(request.type) && - isCommonVideoAttrbiutesValid(video) && + isCommonVideoAttributesValid(video) && videosValidators.isVideoAuthorValid(video.author) && videosValidators.isVideoThumbnailDataValid(video.thumbnailData) ) || ( isRequestTypeUpdateValid(request.type) && - isCommonVideoAttrbiutesValid(video) + isCommonVideoAttributesValid(video) ) || ( isRequestTypeRemoveValid(request.type) && @@ -37,13 +40,27 @@ function isEachRemoteRequestVideosValid (requests) { }) } +function isEachRemoteRequestVideosQaduValid (requests) { + return miscValidators.isArray(requests) && + requests.every(function (request) { + const video = request.data + + return ( + videosValidators.isVideoRemoteIdValid(video.remoteId) && + (has(video, 'views') === false || videosValidators.isVideoViewsValid) && + (has(video, 'likes') === false || videosValidators.isVideoLikesValid) && + (has(video, 'dislikes') === false || videosValidators.isVideoDislikesValid) + ) + }) +} + // --------------------------------------------------------------------------- module.exports = remoteVideosValidators // --------------------------------------------------------------------------- -function isCommonVideoAttrbiutesValid (video) { +function isCommonVideoAttributesValid (video) { return videosValidators.isVideoDateValid(video.createdAt) && videosValidators.isVideoDateValid(video.updatedAt) && videosValidators.isVideoDescriptionValid(video.description) && diff --git a/server/helpers/custom-validators/videos.js b/server/helpers/custom-validators/videos.js index e2d2c8e6d..1d844118b 100644 --- a/server/helpers/custom-validators/videos.js +++ b/server/helpers/custom-validators/videos.js @@ -22,7 +22,10 @@ const videosValidators = { isVideoRemoteIdValid, isVideoAbuseReasonValid, isVideoAbuseReporterUsernameValid, - isVideoFile + isVideoFile, + isVideoViewsValid, + isVideoLikesValid, + isVideoDislikesValid } function isVideoAuthorValid (value) { @@ -82,6 +85,18 @@ function isVideoAbuseReporterUsernameValid (value) { return usersValidators.isUserUsernameValid(value) } +function isVideoViewsValid (value) { + return validator.isInt(value, { min: 0 }) +} + +function isVideoLikesValid (value) { + return validator.isInt(value, { min: 0 }) +} + +function isVideoDislikesValid (value) { + return validator.isInt(value, { min: 0 }) +} + function isVideoFile (value, files) { // Should have files if (!files) return false diff --git a/server/helpers/requests.js b/server/helpers/requests.js index 095b95e1c..427864117 100644 --- a/server/helpers/requests.js +++ b/server/helpers/requests.js @@ -58,6 +58,8 @@ function makeSecureRequest (params, callback) { requestParams.json.data = params.data } + console.log(requestParams.json.data) + request.post(requestParams, callback) } diff --git a/server/initializers/constants.js b/server/initializers/constants.js index 821580893..668bfe56c 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js @@ -5,7 +5,7 @@ const path = require('path') // --------------------------------------------------------------------------- -const LAST_MIGRATION_VERSION = 10 +const LAST_MIGRATION_VERSION = 15 // --------------------------------------------------------------------------- @@ -24,7 +24,7 @@ const SEARCHABLE_COLUMNS = { const SORTABLE_COLUMNS = { USERS: [ 'id', '-id', 'username', '-username', 'createdAt', '-createdAt' ], VIDEO_ABUSES: [ 'id', '-id', 'createdAt', '-createdAt' ], - VIDEOS: [ 'name', '-name', 'duration', '-duration', 'createdAt', '-createdAt' ] + VIDEOS: [ 'name', '-name', 'duration', '-duration', 'createdAt', '-createdAt', 'views', '-views' ] } const OAUTH_LIFETIME = { @@ -116,11 +116,16 @@ const REQUESTS_LIMIT_PODS = 10 // How many requests we send to a pod per interval const REQUESTS_LIMIT_PER_POD = 5 +const REQUESTS_VIDEO_QADU_LIMIT_PODS = 10 +// The QADU requests are not big +const REQUESTS_VIDEO_QADU_LIMIT_PER_POD = 50 + // Number of requests to retry for replay requests module const RETRY_REQUESTS = 5 const REQUEST_ENDPOINTS = { - VIDEOS: 'videos' + VIDEOS: 'videos', + QADU: 'videos/qadu' } const REQUEST_ENDPOINT_ACTIONS = {} REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] = { @@ -130,6 +135,12 @@ REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] = { REPORT_ABUSE: 'report-abuse' } +const REQUEST_VIDEO_QADU_TYPES = { + LIKES: 'likes', + DISLIKES: 'dislikes', + VIEWS: 'views' +} + const REMOTE_SCHEME = { HTTP: 'https', WS: 'wss' @@ -199,10 +210,13 @@ module.exports = { REMOTE_SCHEME, REQUEST_ENDPOINT_ACTIONS, REQUEST_ENDPOINTS, + REQUEST_VIDEO_QADU_TYPES, REQUESTS_IN_PARALLEL, REQUESTS_INTERVAL, REQUESTS_LIMIT_PER_POD, REQUESTS_LIMIT_PODS, + REQUESTS_VIDEO_QADU_LIMIT_PER_POD, + REQUESTS_VIDEO_QADU_LIMIT_PODS, RETRY_REQUESTS, SEARCHABLE_COLUMNS, SIGNATURE_ALGORITHM, diff --git a/server/initializers/migrations/0015-video-views.js b/server/initializers/migrations/0015-video-views.js new file mode 100644 index 000000000..ae49fe73c --- /dev/null +++ b/server/initializers/migrations/0015-video-views.js @@ -0,0 +1,19 @@ +'use strict' + +// utils = { transaction, queryInterface, sequelize, Sequelize } +exports.up = function (utils, finalCallback) { + const q = utils.queryInterface + const Sequelize = utils.Sequelize + + const data = { + type: Sequelize.INTEGER, + allowNull: false, + defaultValue: 0 + } + + q.addColumn('Videos', 'views', data, { transaction: utils.transaction }).asCallback(finalCallback) +} + +exports.down = function (options, callback) { + throw new Error('Not implemented.') +} diff --git a/server/lib/base-request-scheduler.js b/server/lib/base-request-scheduler.js new file mode 100644 index 000000000..d15680c25 --- /dev/null +++ b/server/lib/base-request-scheduler.js @@ -0,0 +1,140 @@ +'use strict' + +const eachLimit = require('async/eachLimit') + +const constants = require('../initializers/constants') +const db = require('../initializers/database') +const logger = require('../helpers/logger') +const requests = require('../helpers/requests') + +module.exports = class BaseRequestScheduler { + + constructor (options) { + this.lastRequestTimestamp = 0 + this.timer = null + } + + activate () { + logger.info('Requests scheduler activated.') + this.lastRequestTimestamp = Date.now() + + this.timer = setInterval(() => { + this.lastRequestTimestamp = Date.now() + this.makeRequests() + }, constants.REQUESTS_INTERVAL) + } + + deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(this.timer) + this.timer = null + } + + forceSend () { + logger.info('Force requests scheduler sending.') + this.makeRequests() + } + + remainingMilliSeconds () { + if (this.timer === null) return -1 + + return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) + } + + // --------------------------------------------------------------------------- + + // Make a requests to friends of a certain type + makeRequest (toPod, requestEndpoint, requestsToMake, callback) { + if (!callback) callback = function () {} + + const params = { + toPod: toPod, + sign: true, // Prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, (err, res) => { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + err = err ? err.message : 'Status code not 20x : ' + res.statusCode + logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) + + return callback(false) + } + + return callback(true) + }) + } + + // Make all the requests of the scheduler + makeRequests () { + this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { + if (err) { + logger.error('Cannot get the list of "%s".', this.description, { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No "%s" to make.', this.description) + return + } + + // We want to group requests by destinations pod and endpoint + const requestsToMakeGrouped = this.buildRequestObjects(requests) + + logger.info('Making "%s" to friends.', this.description) + + const goodPods = [] + const badPods = [] + + eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { + const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod + + // Maybe the pod is not our friend anymore so simply remove it + if (!toPod) { + const requestIdsToDelete = requestToMake.ids + + logger.info('Removing %d "%s" of unexisting pod %s.', requestIdsToDelete.length, this.description, requestToMake.toPod.id) + return this.getRequestToPodModel().removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) + } + + this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { + if (success === false) { + badPods.push(requestToMake.toPod.id) + return callbackEach() + } + + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + // Remove the pod id of these request ids + this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) + + this.afterRequestHook() + }) + }, () => { + // All the requests were made, we update the pods score + db.Pod.updatePodsScore(goodPods, badPods) + + this.afterRequestsHook() + }) + }) + } + + flush (callback) { + this.getRequestModel().removeAll(callback) + } + + afterRequestHook () { + // Nothing to do, let children reimplement it + } + + afterRequestsHook () { + // Nothing to do, let children reimplement it + } +} diff --git a/server/lib/friends.js b/server/lib/friends.js index d53ab4553..424a30801 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -12,15 +12,19 @@ const logger = require('../helpers/logger') const peertubeCrypto = require('../helpers/peertube-crypto') const requests = require('../helpers/requests') const RequestScheduler = require('./request-scheduler') +const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler') const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] + const requestScheduler = new RequestScheduler() +const requestSchedulerVideoQadu = new RequestVideoQaduScheduler() const friends = { activate, addVideoToFriends, updateVideoToFriends, reportAbuseVideoToFriend, + quickAndDirtyUpdateVideoToFriends, hasFriends, makeFriends, quitFriends, @@ -30,6 +34,7 @@ const friends = { function activate () { requestScheduler.activate() + requestSchedulerVideoQadu.activate() } function addVideoToFriends (videoData, transaction, callback) { @@ -71,6 +76,15 @@ function reportAbuseVideoToFriend (reportData, video) { createRequest(options) } +function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) { + const options = { + videoId, + type, + transaction + } + return createVideoQaduRequest(options, callback) +} + function hasFriends (callback) { db.Pod.countAll(function (err, count) { if (err) return callback(err) @@ -110,7 +124,11 @@ function quitFriends (callback) { waterfall([ function flushRequests (callbackAsync) { - requestScheduler.flush(callbackAsync) + requestScheduler.flush(err => callbackAsync(err)) + }, + + function flushVideoQaduRequests (callbackAsync) { + requestSchedulerVideoQadu.flush(err => callbackAsync(err)) }, function getPodsList (callbackAsync) { @@ -310,6 +328,12 @@ function createRequest (options, callback) { }) } +function createVideoQaduRequest (options, callback) { + if (!callback) callback = function () {} + + requestSchedulerVideoQadu.createRequest(options, callback) +} + function isMe (host) { return host === constants.CONFIG.WEBSERVER.HOST } diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js index 28dabe339..6b6535519 100644 --- a/server/lib/request-scheduler.js +++ b/server/lib/request-scheduler.js @@ -1,44 +1,54 @@ 'use strict' -const eachLimit = require('async/eachLimit') - const constants = require('../initializers/constants') +const BaseRequestScheduler = require('./base-request-scheduler') const db = require('../initializers/database') const logger = require('../helpers/logger') -const requests = require('../helpers/requests') -module.exports = class RequestScheduler { +module.exports = class RequestScheduler extends BaseRequestScheduler { constructor () { - this.lastRequestTimestamp = 0 - this.timer = null + super() + + // We limit the size of the requests + this.limitPods = constants.REQUESTS_LIMIT_PODS + this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD + + this.description = 'requests' } - activate () { - logger.info('Requests scheduler activated.') - this.lastRequestTimestamp = Date.now() - - this.timer = setInterval(() => { - this.lastRequestTimestamp = Date.now() - this.makeRequests() - }, constants.REQUESTS_INTERVAL) + getRequestModel () { + return db.Request } - deactivate () { - logger.info('Requests scheduler deactivated.') - clearInterval(this.timer) - this.timer = null + getRequestToPodModel () { + return db.RequestToPod } - forceSend () { - logger.info('Force requests scheduler sending.') - this.makeRequests() - } + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} - remainingMilliSeconds () { - if (this.timer === null) return -1 + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint - return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: request.endpoint, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data, + } + } + + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].datas.push(request.request) + }) + }) + + return requestsToMakeGrouped } // { type, endpoint, data, toIds, transaction } @@ -79,122 +89,10 @@ module.exports = class RequestScheduler { // --------------------------------------------------------------------------- - // Make all the requests of the scheduler - makeRequests () { - // We limit the size of the requests - // We don't want to stuck with the same failing requests so we get a random list - db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { - if (err) { - logger.error('Cannot get the list of requests.', { err: err }) - return // Abort - } - - // If there are no requests, abort - if (requests.length === 0) { - logger.info('No requests to make.') - return - } - - // We want to group requests by destinations pod and endpoint - const requestsToMakeGrouped = this.buildRequestObjects(requests) - - logger.info('Making requests to friends.') - - const goodPods = [] - const badPods = [] - - eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { - const requestToMake = requestsToMakeGrouped[hashKey] - const toPod = requestToMake.toPod - - // Maybe the pod is not our friend anymore so simply remove it - if (!toPod) { - const requestIdsToDelete = requestToMake.ids - - logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) - return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) - } - - this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { - if (success === false) { - badPods.push(requestToMake.toPod.id) - return callbackEach() - } - - logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) - goodPods.push(requestToMake.toPod.id) - - // Remove the pod id of these request ids - db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) - }) - }, () => { - // All the requests were made, we update the pods score - db.Request.updatePodsScore(goodPods, badPods) - // Flush requests with no pod - db.Request.removeWithEmptyTo(err => { - if (err) logger.error('Error when removing requests with no pods.', { error: err }) - }) - }) - }) - } - - // Make a requests to friends of a certain type - makeRequest (toPod, requestEndpoint, requestsToMake, callback) { - if (!callback) callback = function () {} - - const params = { - toPod: toPod, - sign: true, // Prove our identity - method: 'POST', - path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, - data: requestsToMake // Requests we need to make - } - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeSecureRequest(params, (err, res) => { - if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { - err = err ? err.message : 'Status code not 20x : ' + res.statusCode - logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) - - return callback(false) - } - - return callback(true) - }) - } - - buildRequestObjects (requests) { - const requestsToMakeGrouped = {} - - Object.keys(requests).forEach(toPodId => { - requests[toPodId].forEach(data => { - const request = data.request - const pod = data.pod - const hashKey = toPodId + request.endpoint - - if (!requestsToMakeGrouped[hashKey]) { - requestsToMakeGrouped[hashKey] = { - toPod: pod, - endpoint: request.endpoint, - ids: [], // request ids, to delete them from the DB in the future - datas: [] // requests data, - } - } - - requestsToMakeGrouped[hashKey].ids.push(request.id) - requestsToMakeGrouped[hashKey].datas.push(request.request) - }) - }) - - return requestsToMakeGrouped - } - - flush (callback) { - db.Request.removeAll(err => { - if (err) logger.error('Cannot flush the requests.', { error: err }) - - return callback(err) + afterRequestsHook () { + // Flush requests with no pod + this.getRequestModel().removeWithEmptyTo(err => { + if (err) logger.error('Error when removing requests with no pods.', { error: err }) }) } } diff --git a/server/lib/request-video-qadu-scheduler.js b/server/lib/request-video-qadu-scheduler.js new file mode 100644 index 000000000..401b2fb44 --- /dev/null +++ b/server/lib/request-video-qadu-scheduler.js @@ -0,0 +1,116 @@ +'use strict' + +const BaseRequestScheduler = require('./base-request-scheduler') +const constants = require('../initializers/constants') +const db = require('../initializers/database') +const logger = require('../helpers/logger') + +module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler { + + constructor () { + super() + + // We limit the size of the requests + this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS + this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS + + this.description = 'video QADU requests' + } + + getRequestModel () { + return db.RequestVideoQadu + } + + getRequestToPodModel () { + return db.RequestVideoQadu + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const video = data.video + const pod = data.pod + const hashKey = toPodId + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: constants.REQUEST_ENDPOINTS.QADU, + ids: [], // request ids, to delete them from the DB in the future + datas: [], // requests data + videos: {} + } + } + + if (!requestsToMakeGrouped[hashKey].videos[video.id]) { + requestsToMakeGrouped[hashKey].videos[video.id] = {} + } + + const videoData = requestsToMakeGrouped[hashKey].videos[video.id] + + switch (request.type) { + case constants.REQUEST_VIDEO_QADU_TYPES.LIKES: + videoData.likes = video.likes + break + + case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES: + videoData.likes = video.dislikes + break + + case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS: + videoData.views = video.views + break + + default: + logger.error('Unknown request video QADU type %s.', request.type) + return + } + + // Do not forget the remoteId so the remote pod can identify the video + videoData.remoteId = video.id + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].videos[video.id] = videoData + }) + }) + + Object.keys(requestsToMakeGrouped).forEach(hashKey => { + Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => { + const videoData = requestsToMakeGrouped[hashKey].videos[videoId] + + requestsToMakeGrouped[hashKey].datas.push({ + data: videoData + }) + }) + + // We don't need it anymore, it was just to build our datas array + delete requestsToMakeGrouped[hashKey].videos + }) + + return requestsToMakeGrouped + } + + // { type, videoId, transaction? } + createRequest (options, callback) { + const type = options.type + const videoId = options.videoId + const transaction = options.transaction + + const dbRequestOptions = {} + if (transaction) dbRequestOptions.transaction = transaction + + // Send the update to all our friends + db.Pod.listAllIds(options.transaction, function (err, podIds) { + if (err) return callback(err) + + const queries = [] + podIds.forEach(podId => { + queries.push({ type, videoId, podId }) + }) + + return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) + }) + } +} diff --git a/server/middlewares/validators/remote/videos.js b/server/middlewares/validators/remote/videos.js index cf9925b6c..ddc274c45 100644 --- a/server/middlewares/validators/remote/videos.js +++ b/server/middlewares/validators/remote/videos.js @@ -4,7 +4,8 @@ const checkErrors = require('../utils').checkErrors const logger = require('../../../helpers/logger') const validatorsRemoteVideos = { - remoteVideos + remoteVideos, + remoteQaduVideos } function remoteVideos (req, res, next) { @@ -15,6 +16,14 @@ function remoteVideos (req, res, next) { checkErrors(req, res, next) } +function remoteQaduVideos (req, res, next) { + req.checkBody('data').isEachRemoteRequestVideosQaduValid() + + logger.debug('Checking remoteVideosQadu parameters', { parameters: req.body }) + + checkErrors(req, res, next) +} + // --------------------------------------------------------------------------- module.exports = validatorsRemoteVideos diff --git a/server/models/pod.js b/server/models/pod.js index 79afb737a..14814708e 100644 --- a/server/models/pod.js +++ b/server/models/pod.js @@ -1,8 +1,11 @@ 'use strict' +const each = require('async/each') const map = require('lodash/map') +const waterfall = require('async/waterfall') const constants = require('../initializers/constants') +const logger = require('../helpers/logger') const customPodsValidators = require('../helpers/custom-validators').pods // --------------------------------------------------------------------------- @@ -62,6 +65,7 @@ module.exports = function (sequelize, DataTypes) { listBadPods, load, loadByHost, + updatePodsScore, removeAll }, instanceMethods: { @@ -144,7 +148,7 @@ function listAllIds (transaction, callback) { }) } -function listRandomPodIdsWithRequest (limit, callback) { +function listRandomPodIdsWithRequest (limit, tableRequestPod, callback) { const self = this self.count().asCallback(function (err, count) { @@ -166,7 +170,7 @@ function listRandomPodIdsWithRequest (limit, callback) { where: { id: { $in: [ - this.sequelize.literal('SELECT "podId" FROM "RequestToPods"') + this.sequelize.literal('SELECT "podId" FROM "' + tableRequestPod + '"') ] } } @@ -207,3 +211,58 @@ function loadByHost (host, callback) { function removeAll (callback) { return this.destroy().asCallback(callback) } + +function updatePodsScore (goodPods, badPods) { + const self = this + + logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) + + if (goodPods.length !== 0) { + this.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { + if (err) logger.error('Cannot increment scores of good pods.', { error: err }) + }) + } + + if (badPods.length !== 0) { + this.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { + if (err) logger.error('Cannot decrement scores of bad pods.', { error: err }) + removeBadPods.call(self) + }) + } +} + +// --------------------------------------------------------------------------- + +// Remove pods with a score of 0 (too many requests where they were unreachable) +function removeBadPods () { + const self = this + + waterfall([ + function findBadPods (callback) { + self.sequelize.models.Pod.listBadPods(function (err, pods) { + if (err) { + logger.error('Cannot find bad pods.', { error: err }) + return callback(err) + } + + return callback(null, pods) + }) + }, + + function removeTheseBadPods (pods, callback) { + each(pods, function (pod, callbackEach) { + pod.destroy().asCallback(callbackEach) + }, function (err) { + return callback(err, pods.length) + }) + } + ], function (err, numberOfPodsRemoved) { + if (err) { + logger.error('Cannot remove bad pods.', { error: err }) + } else if (numberOfPodsRemoved) { + logger.info('Removed %d pods.', numberOfPodsRemoved) + } else { + logger.info('No need to remove bad pods.') + } + }) +} diff --git a/server/models/request-to-pod.js b/server/models/request-to-pod.js index f42a53458..0e01a842e 100644 --- a/server/models/request-to-pod.js +++ b/server/models/request-to-pod.js @@ -17,7 +17,7 @@ module.exports = function (sequelize, DataTypes) { } ], classMethods: { - removePodOf + removeByRequestIdsAndPod } }) @@ -26,7 +26,7 @@ module.exports = function (sequelize, DataTypes) { // --------------------------------------------------------------------------- -function removePodOf (requestsIds, podId, callback) { +function removeByRequestIdsAndPod (requestsIds, podId, callback) { if (!callback) callback = function () {} const query = { diff --git a/server/models/request-video-qadu.js b/server/models/request-video-qadu.js new file mode 100644 index 000000000..7010fc992 --- /dev/null +++ b/server/models/request-video-qadu.js @@ -0,0 +1,154 @@ +'use strict' + +/* + Request Video for Quick And Dirty Updates like: + - views + - likes + - dislikes + + We can't put it in the same system than basic requests for efficiency. + Moreover we don't want to slow down the basic requests with a lot of views/likes/dislikes requests. + So we put it an independant request scheduler. +*/ + +const values = require('lodash/values') + +const constants = require('../initializers/constants') + +// --------------------------------------------------------------------------- + +module.exports = function (sequelize, DataTypes) { + const RequestVideoQadu = sequelize.define('RequestVideoQadu', + { + type: { + type: DataTypes.ENUM(values(constants.REQUEST_VIDEO_QADU_TYPES)), + allowNull: false + } + }, + { + timestamps: false, + indexes: [ + { + fields: [ 'podId' ] + }, + { + fields: [ 'videoId' ] + } + ], + classMethods: { + associate, + + listWithLimitAndRandom, + + countTotalRequests, + removeAll, + removeByRequestIdsAndPod + } + } + ) + + return RequestVideoQadu +} + +// ------------------------------ STATICS ------------------------------ + +function associate (models) { + this.belongsTo(models.Pod, { + foreignKey: { + name: 'podId', + allowNull: false + }, + onDelete: 'CASCADE' + }) + + this.belongsTo(models.Video, { + foreignKey: { + name: 'videoId', + allowNull: false + }, + onDelete: 'CASCADE' + }) +} + +function countTotalRequests (callback) { + const query = { + include: [ this.sequelize.models.Pod ] + } + + return this.count(query).asCallback(callback) +} + +function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { + const self = this + const Pod = this.sequelize.models.Pod + + Pod.listRandomPodIdsWithRequest(limitPods, 'RequestVideoQadus', function (err, podIds) { + if (err) return callback(err) + + // We don't have friends that have requests + if (podIds.length === 0) return callback(null, []) + + const query = { + include: [ + { + model: self.sequelize.models.Pod, + where: { + id: { + $in: podIds + } + } + }, + { + model: self.sequelize.models.Video + } + ] + } + + self.findAll(query).asCallback(function (err, requests) { + if (err) return callback(err) + + const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod) + return callback(err, requestsGrouped) + }) + }) +} + +function removeByRequestIdsAndPod (ids, podId, callback) { + const query = { + where: { + id: { + $in: ids + }, + podId + } + } + + this.destroy(query).asCallback(callback) +} + +function removeAll (callback) { + // Delete all requests + this.truncate({ cascade: true }).asCallback(callback) +} + +// --------------------------------------------------------------------------- + +function groupAndTruncateRequests (requests, limitRequestsPerPod) { + const requestsGrouped = {} + + requests.forEach(function (request) { + const pod = request.Pod + + if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] + + if (requestsGrouped[pod.id].length < limitRequestsPerPod) { + requestsGrouped[pod.id].push({ + request: request, + video: request.Video, + pod + }) + } + }) + + return requestsGrouped +} diff --git a/server/models/request.js b/server/models/request.js index ca616d130..de73501fc 100644 --- a/server/models/request.js +++ b/server/models/request.js @@ -1,11 +1,8 @@ 'use strict' -const each = require('async/each') -const waterfall = require('async/waterfall') const values = require('lodash/values') const constants = require('../initializers/constants') -const logger = require('../helpers/logger') // --------------------------------------------------------------------------- @@ -28,8 +25,6 @@ module.exports = function (sequelize, DataTypes) { listWithLimitAndRandom, countTotalRequests, - removeBadPods, - updatePodsScore, removeAll, removeWithEmptyTo } @@ -60,71 +55,17 @@ function countTotalRequests (callback) { return this.count(query).asCallback(callback) } -// Remove pods with a score of 0 (too many requests where they were unreachable) -function removeBadPods () { - const self = this - - waterfall([ - function findBadPods (callback) { - self.sequelize.models.Pod.listBadPods(function (err, pods) { - if (err) { - logger.error('Cannot find bad pods.', { error: err }) - return callback(err) - } - - return callback(null, pods) - }) - }, - - function removeTheseBadPods (pods, callback) { - each(pods, function (pod, callbackEach) { - pod.destroy().asCallback(callbackEach) - }, function (err) { - return callback(err, pods.length) - }) - } - ], function (err, numberOfPodsRemoved) { - if (err) { - logger.error('Cannot remove bad pods.', { error: err }) - } else if (numberOfPodsRemoved) { - logger.info('Removed %d pods.', numberOfPodsRemoved) - } else { - logger.info('No need to remove bad pods.') - } - }) -} - -function updatePodsScore (goodPods, badPods) { - const self = this - const Pod = this.sequelize.models.Pod - - logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) - - if (goodPods.length !== 0) { - Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { - if (err) logger.error('Cannot increment scores of good pods.', { error: err }) - }) - } - - if (badPods.length !== 0) { - Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { - if (err) logger.error('Cannot decrement scores of bad pods.', { error: err }) - removeBadPods.call(self) - }) - } -} - function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { const self = this const Pod = this.sequelize.models.Pod - Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) { + Pod.listRandomPodIdsWithRequest(limitPods, 'RequestToPods', function (err, podIds) { if (err) return callback(err) // We don't have friends that have requests if (podIds.length === 0) return callback(null, []) - // The the first x requests of these pods + // The first x requests of these pods // It is very important to sort by id ASC to keep the requests order! const query = { order: [ diff --git a/server/models/video.js b/server/models/video.js index d0fd61eb4..daa273845 100644 --- a/server/models/video.js +++ b/server/models/video.js @@ -80,6 +80,15 @@ module.exports = function (sequelize, DataTypes) { if (res === false) throw new Error('Video duration is not valid.') } } + }, + views: { + type: DataTypes.INTEGER, + allowNull: false, + defaultValue: 0, + validate: { + min: 0, + isInt: true + } } }, { @@ -101,6 +110,9 @@ module.exports = function (sequelize, DataTypes) { }, { fields: [ 'infoHash' ] + }, + { + fields: [ 'views' ] } ], classMethods: { @@ -336,6 +348,7 @@ function toFormatedJSON () { magnetUri: this.generateMagnetUri(), author: this.Author.name, duration: this.duration, + views: this.views, tags: map(this.Tags, 'name'), thumbnailPath: pathUtils.join(constants.STATIC_PATHS.THUMBNAILS, this.getThumbnailName()), createdAt: this.createdAt, diff --git a/server/tests/api/multiple-pods.js b/server/tests/api/multiple-pods.js index df12ba0e9..871db54be 100644 --- a/server/tests/api/multiple-pods.js +++ b/server/tests/api/multiple-pods.js @@ -3,6 +3,7 @@ const chai = require('chai') const each = require('async/each') const expect = chai.expect +const parallel = require('async/parallel') const series = require('async/series') const WebTorrent = require('webtorrent') const webtorrent = new WebTorrent() @@ -375,6 +376,63 @@ describe('Test multiple pods', function () { }) }) + describe('Should update video views', function () { + let videoId1 + let videoId2 + + before(function (done) { + videosUtils.getVideosList(servers[2].url, function (err, res) { + if (err) throw err + + const videos = res.body.data.filter(video => video.isLocal === true) + videoId1 = videos[0].id + videoId2 = videos[1].id + + done() + }) + }) + + it('Should views multiple videos on owned servers', function (done) { + this.timeout(30000) + + parallel([ + function (callback) { + videosUtils.getVideo(servers[2].url, videoId1, callback) + }, + + function (callback) { + videosUtils.getVideo(servers[2].url, videoId1, callback) + }, + + function (callback) { + videosUtils.getVideo(servers[2].url, videoId1, callback) + }, + + function (callback) { + videosUtils.getVideo(servers[2].url, videoId2, callback) + } + ], function (err) { + if (err) throw err + + setTimeout(done, 22000) + }) + }) + + it('Should have views updated on each pod', function (done) { + each(servers, function (server, callback) { + videosUtils.getVideosList(server.url, function (err, res) { + if (err) throw err + + const videos = res.body.data + expect(videos.find(video => video.views === 3)).to.be.exist + expect(videos.find(video => video.views === 1)).to.be.exist + + callback() + }) + }, done) + }) + }) +/* describe('Should manipulate these videos', function () { it('Should update the video 3 by asking pod 3', function (done) { this.timeout(15000) @@ -462,7 +520,7 @@ describe('Test multiple pods', function () { }, done) }) }) - +*/ after(function (done) { servers.forEach(function (server) { process.kill(-server.app.pid) diff --git a/server/tests/api/single-pod.js b/server/tests/api/single-pod.js index 83a2b4411..40c33686f 100644 --- a/server/tests/api/single-pod.js +++ b/server/tests/api/single-pod.js @@ -129,6 +129,17 @@ describe('Test a single pod', function () { }) }) + it('Should have the views updated', function (done) { + videosUtils.getVideo(server.url, videoId, function (err, res) { + if (err) throw err + + const video = res.body + expect(video.views).to.equal(1) + + done() + }) + }) + it('Should search the video by name by default', function (done) { videosUtils.searchVideo(server.url, 'my', function (err, res) { if (err) throw err