Try to make a better communication (between pods) module

This commit is contained in:
Chocobozzz 2016-06-18 16:13:54 +02:00
parent b2e4c0ba1a
commit 528a9efa82
20 changed files with 493 additions and 439 deletions

View File

@ -16,7 +16,7 @@ import 'rxjs/Observable';
import 'rxjs/Subject';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/observable/throw';
import 'bootstrap-loader';
import 'ng2-file-upload';

View File

@ -5,12 +5,12 @@ const express = require('express')
const router = express.Router()
const podsController = require('./pods')
const remoteVideosController = require('./remoteVideos')
const remoteController = require('./remote')
const usersController = require('./users')
const videosController = require('./videos')
router.use('/pods', podsController)
router.use('/remotevideos', remoteVideosController)
router.use('/remote', remoteController)
router.use('/users', usersController)
router.use('/videos', videosController)
router.use('/*', badRequest)

View File

@ -9,19 +9,18 @@ const middlewares = require('../../../middlewares')
const Pods = require('../../../models/pods')
const oAuth2 = middlewares.oauth2
const reqValidator = middlewares.reqValidators.pods
const secureMiddleware = middlewares.secure
const secureRequest = middlewares.reqValidators.remote.secureRequest
const signatureValidator = middlewares.reqValidators.remote.signature
const videos = require('../../../lib/videos')
const Videos = require('../../../models/videos')
const router = express.Router()
router.get('/', listPods)
router.get('/', listPodsUrl)
router.post('/', reqValidator.podsAdd, addPods)
router.get('/makefriends', oAuth2.authenticate, reqValidator.makeFriends, makeFriends)
router.get('/quitfriends', oAuth2.authenticate, quitFriends)
// Post because this is a secured request
router.post('/remove', secureRequest, secureMiddleware.decryptBody, removePods)
router.post('/remove', signatureValidator, removePods)
// ---------------------------------------------------------------------------
@ -30,22 +29,17 @@ module.exports = router
// ---------------------------------------------------------------------------
function addPods (req, res, next) {
const informations = req.body.data
const informations = req.body
async.waterfall([
function addPod (callback) {
Pods.add(informations, function (err) {
return callback(err)
})
Pods.add(informations, callback)
},
function createVideosOfThisPod (callback) {
// Create the remote videos from the new pod
videos.createRemoteVideos(informations.videos, function (err) {
if (err) logger.error('Cannot create remote videos.', { error: err })
function sendMyVideos (podCreated, callback) {
friends.sendOwnedVideosToPod(podCreated._id)
return callback(err)
})
callback(null)
},
function fetchMyCertificate (callback) {
@ -57,30 +51,19 @@ function addPods (req, res, next) {
return callback(null, cert)
})
},
function getListOfMyVideos (cert, callback) {
Videos.listOwned(function (err, videosList) {
if (err) {
logger.error('Cannot get the list of owned videos.')
return callback(err)
}
return callback(null, cert, videosList)
})
}
], function (err, cert, videosList) {
], function (err, cert) {
if (err) return next(err)
return res.json({ cert: cert, videos: videosList })
return res.json({ cert: cert })
})
}
function listPods (req, res, next) {
Pods.list(function (err, podsList) {
function listPodsUrl (req, res, next) {
Pods.listAllUrls(function (err, podsUrlList) {
if (err) return next(err)
res.json(podsList)
res.json(podsUrlList)
})
}

View File

@ -0,0 +1,80 @@
'use strict'
const async = require('async')
const express = require('express')
const middlewares = require('../../../middlewares')
const secureMiddleware = middlewares.secure
const reqValidator = middlewares.reqValidators.remote
const logger = require('../../../helpers/logger')
const Videos = require('../../../models/videos')
const videos = require('../../../lib/videos')
const router = express.Router()
router.post('/videos',
reqValidator.signature,
reqValidator.dataToDecrypt,
secureMiddleware.decryptBody,
reqValidator.remoteVideos,
remoteVideos
)
// ---------------------------------------------------------------------------
module.exports = router
// ---------------------------------------------------------------------------
function remoteVideos (req, res, next) {
const requests = req.body.data
const fromUrl = req.body.signature.url
// We need to process in the same order to keep consistency
// TODO: optimization
async.eachSeries(requests, function (request, callbackEach) {
const video = request.data
if (request.type === 'add') {
addRemoteVideo(video, callbackEach)
} else if (request.type === 'remove') {
removeRemoteVideo(video, fromUrl, callbackEach)
}
})
// We don't need to keep the other pod waiting
return res.type('json').status(204).end()
}
function addRemoteVideo (videoToCreate, callback) {
videos.createRemoteVideos([ videoToCreate ], function (err, remoteVideos) {
if (err) {
logger.error('Cannot create remote videos.', { error: err })
// Don't break the process
}
return callback()
})
}
function removeRemoteVideo (videoToRemove, fromUrl, callback) {
const magnetUris = [ videoToRemove.magnetUri ]
// We need the list because we have to remove some other stuffs (thumbnail etc)
Videos.listFromUrlAndMagnets(fromUrl, magnetUris, function (err, videosList) {
if (err) {
logger.error('Cannot list videos from url and magnets.', { error: err })
// Don't break the process
return callback()
}
videos.removeRemoteVideos(videosList, function (err) {
if (err) {
logger.error('Cannot remove remote videos.', { error: err })
// Don't break the process
}
return callback()
})
})
}

View File

@ -1,66 +0,0 @@
'use strict'
const express = require('express')
const map = require('lodash/map')
const middlewares = require('../../../middlewares')
const secureMiddleware = middlewares.secure
const reqValidator = middlewares.reqValidators.remote
const logger = require('../../../helpers/logger')
const Videos = require('../../../models/videos')
const videos = require('../../../lib/videos')
const router = express.Router()
router.post('/add',
reqValidator.secureRequest,
secureMiddleware.decryptBody,
reqValidator.remoteVideosAdd,
addRemoteVideos
)
router.post('/remove',
reqValidator.secureRequest,
secureMiddleware.decryptBody,
reqValidator.remoteVideosRemove,
removeRemoteVideo
)
// ---------------------------------------------------------------------------
module.exports = router
// ---------------------------------------------------------------------------
function addRemoteVideos (req, res, next) {
const videosToCreate = req.body.data
videos.createRemoteVideos(videosToCreate, function (err, remoteVideos) {
if (err) {
logger.error('Cannot create remote videos.', { error: err })
return next(err)
}
res.type('json').status(201).end()
})
}
function removeRemoteVideo (req, res, next) {
const fromUrl = req.body.signature.url
const magnetUris = map(req.body.data, 'magnetUri')
Videos.listFromUrlAndMagnets(fromUrl, magnetUris, function (err, videosList) {
if (err) {
logger.error('Cannot list videos from url and magnets.', { error: err })
return next(err)
}
videos.removeRemoteVideos(videosList, function (err) {
if (err) {
logger.error('Cannot remove remote videos.', { error: err })
return next(err)
}
res.type('json').status(204).end()
})
})
}

View File

@ -3,8 +3,6 @@
const async = require('async')
const config = require('config')
const express = require('express')
const fs = require('fs')
const path = require('path')
const multer = require('multer')
const constants = require('../../../initializers/constants')
@ -46,7 +44,6 @@ const storage = multer.diskStorage({
})
const reqFiles = multer({ storage: storage }).fields([{ name: 'videofile', maxCount: 1 }])
const thumbnailsDir = path.join(__dirname, '..', '..', '..', '..', config.get('storage.thumbnails'))
router.get('/',
reqValidatorPagination.pagination,
@ -127,34 +124,25 @@ function addVideo (req, res, next) {
return callback(err)
}
return callback(null, torrent, thumbnailName, videoData, insertedVideo)
return callback(null, insertedVideo)
})
},
function getThumbnailBase64 (torrent, thumbnailName, videoData, insertedVideo, callback) {
videoData.createdDate = insertedVideo.createdDate
fs.readFile(thumbnailsDir + thumbnailName, function (err, thumbnailData) {
function sendToFriends (insertedVideo, callback) {
videos.convertVideoToRemote(insertedVideo, function (err, remoteVideo) {
if (err) {
// TODO unseed the video
// TODO remove thumbnail
// TODO: remove video
logger.error('Cannot read the thumbnail of the video')
// TODO delete from DB
logger.error('Cannot convert video to remote.')
return callback(err)
}
return callback(null, videoData, thumbnailData)
// Now we'll add the video's meta data to our friends
friends.addVideoToFriends(remoteVideo)
return callback(null)
})
},
function sendToFriends (videoData, thumbnailData, callback) {
// Set the image in base64
videoData.thumbnailBase64 = new Buffer(thumbnailData).toString('base64')
// Now we'll add the video's meta data to our friends
friends.addVideoToFriends(videoData)
return callback(null)
}
], function andFinally (err) {

View File

@ -7,8 +7,7 @@ const VIDEOS_CONSTRAINTS_FIELDS = constants.VIDEOS_CONSTRAINTS_FIELDS
const customValidators = {
exists: exists,
isEachAddRemoteVideosValid: isEachAddRemoteVideosValid,
isEachRemoveRemoteVideosValid: isEachRemoveRemoteVideosValid,
isEachRemoteVideosValid: isEachRemoteVideosValid,
isArray: isArray,
isVideoAuthorValid: isVideoAuthorValid,
isVideoDateValid: isVideoDateValid,
@ -25,23 +24,26 @@ function exists (value) {
return value !== undefined && value !== null
}
function isEachAddRemoteVideosValid (videos) {
return videos.every(function (video) {
return isVideoAuthorValid(video.author) &&
isVideoDateValid(video.createdDate) &&
isVideoDescriptionValid(video.description) &&
isVideoDurationValid(video.duration) &&
isVideoMagnetUriValid(video.magnetUri) &&
isVideoNameValid(video.name) &&
isVideoPodUrlValid(video.podUrl) &&
isVideoTagsValid(video.tags) &&
isVideoThumbnailValid(video.thumbnailBase64)
})
}
function isEachRemoveRemoteVideosValid (videos) {
return videos.every(function (video) {
return isVideoMagnetUriValid(video.magnetUri)
function isEachRemoteVideosValid (requests) {
return requests.every(function (request) {
const video = request.data
return (
isRequestTypeAddValid(request.type) &&
isVideoAuthorValid(video.author) &&
isVideoDateValid(video.createdDate) &&
isVideoDescriptionValid(video.description) &&
isVideoDurationValid(video.duration) &&
isVideoMagnetUriValid(video.magnetUri) &&
isVideoNameValid(video.name) &&
isVideoPodUrlValid(video.podUrl) &&
isVideoTagsValid(video.tags) &&
isVideoThumbnailValid(video.thumbnailBase64)
) ||
(
isRequestTypeRemoveValid(request.type) &&
isVideoNameValid(video.name) &&
isVideoMagnetUriValid(video.magnetUri)
)
})
}
@ -49,6 +51,14 @@ function isArray (value) {
return Array.isArray(value)
}
function isRequestTypeAddValid (value) {
return value === 'add'
}
function isRequestTypeRemoveValid (value) {
return value === 'remove'
}
function isVideoAuthorValid (value) {
return validator.isLength(value, VIDEOS_CONSTRAINTS_FIELDS.AUTHOR)
}

View File

@ -1,12 +1,10 @@
'use strict'
const async = require('async')
const config = require('config')
const request = require('request')
const replay = require('request-replay')
const request = require('request')
const constants = require('../initializers/constants')
const logger = require('./logger')
const peertubeCrypto = require('./peertubeCrypto')
const http = config.get('webserver.https') ? 'https' : 'http'
@ -14,93 +12,67 @@ const host = config.get('webserver.host')
const port = config.get('webserver.port')
const requests = {
makeMultipleRetryRequest: makeMultipleRetryRequest
makeRetryRequest: makeRetryRequest,
makeSecureRequest: makeSecureRequest
}
function makeMultipleRetryRequest (allData, pods, callbackEach, callback) {
if (!callback) {
callback = callbackEach
callbackEach = null
function makeRetryRequest (params, callback) {
replay(
request(params, callback),
{
retries: constants.RETRY_REQUESTS,
factor: 3,
maxTimeout: Infinity,
errorCodes: [ 'EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED' ]
}
)
}
function makeSecureRequest (params, callback) {
const myUrl = http + '://' + host + ':' + port
const requestParams = {
url: params.toPod.url + params.path
}
const url = http + '://' + host + ':' + port
let signature
// Add data with POST requst ?
if (params.method === 'POST') {
requestParams.json = {}
// Add signature if it is specified in the params
if (allData.method === 'POST' && allData.data && allData.sign === true) {
signature = peertubeCrypto.sign(url)
}
// Make a request for each pod
async.each(pods, function (pod, callbackEachAsync) {
function callbackEachRetryRequest (err, response, body, url, pod) {
if (callbackEach !== null) {
callbackEach(err, response, body, url, pod, function () {
callbackEachAsync()
})
} else {
callbackEachAsync()
// Add signature if it is specified in the params
if (params.sign === true) {
requestParams.json.signature = {
url: myUrl,
signature: peertubeCrypto.sign(myUrl)
}
}
const params = {
url: pod.url + allData.path,
method: allData.method
}
// Add data with POST requst ?
if (allData.method === 'POST' && allData.data) {
// Encrypt data ?
if (allData.encrypt === true) {
peertubeCrypto.encrypt(pod.publicKey, JSON.stringify(allData.data), function (err, encrypted) {
// If there are data informations
if (params.data) {
// Encrypt data
if (params.encrypt === true) {
peertubeCrypto.encrypt(params.toPod.publicKey, JSON.stringify(params.data), function (err, encrypted) {
if (err) return callback(err)
params.json = {
data: encrypted.data,
key: encrypted.key
}
requestParams.json.data = encrypted.data
requestParams.json.key = encrypted.key
makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest)
request.post(requestParams, callback)
})
} else {
params.json = { data: allData.data }
makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest)
// No encryption
requestParams.json.data = params.data
request.post(requestParams, callback)
}
} else {
makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest)
// No data
request.post(requestParams, callback)
}
}, callback)
} else {
request.get(requestParams, callback)
}
}
// ---------------------------------------------------------------------------
module.exports = requests
// ---------------------------------------------------------------------------
function makeRetryRequest (params, fromUrl, toPod, signature, callbackEach) {
// Append the signature
if (signature) {
params.json.signature = {
url: fromUrl,
signature: signature
}
}
logger.debug('Make retry requests to %s.', toPod.url)
replay(
request.post(params, function (err, response, body) {
callbackEach(err, response, body, params.url, toPod)
}),
{
retries: constants.REQUEST_RETRIES,
factor: 3,
maxTimeout: Infinity,
errorCodes: [ 'EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED' ]
}
).on('replay', function (replay) {
logger.info('Replaying request to %s. Request failed: %d %s. Replay number: #%d. Will retry in: %d ms.',
params.url, replay.error.code, replay.error.message, replay.number, replay.delay)
})
}

View File

@ -18,11 +18,11 @@ const PODS_SCORE = {
BONUS: 10
}
// Number of retries we make for the make retry requests (to friends...)
let REQUEST_RETRIES = 10
// Number of requests in parallel we can make
const REQUESTS_IN_PARALLEL = 10
// Different types or requests for the request scheduler module
const REQUEST_SCHEDULER_TYPE = [ 'add', 'remove' ]
// Number of requests to retry for replay requests module
const RETRY_REQUESTS = 5
// Sortable columns per schema
const SEARCHABLE_COLUMNS = {
@ -56,7 +56,6 @@ if (isTestInstance() === true) {
FRIEND_BASE_SCORE = 20
INTERVAL = 10000
VIDEOS_CONSTRAINTS_FIELDS.DURATION.max = 14
REQUEST_RETRIES = 2
}
// ---------------------------------------------------------------------------
@ -67,8 +66,8 @@ module.exports = {
INTERVAL: INTERVAL,
PAGINATION_COUNT_DEFAULT: PAGINATION_COUNT_DEFAULT,
PODS_SCORE: PODS_SCORE,
REQUEST_RETRIES: REQUEST_RETRIES,
REQUEST_SCHEDULER_TYPE: REQUEST_SCHEDULER_TYPE,
REQUESTS_IN_PARALLEL: REQUESTS_IN_PARALLEL,
RETRY_REQUESTS: RETRY_REQUESTS,
SEARCHABLE_COLUMNS: SEARCHABLE_COLUMNS,
SORTABLE_COLUMNS: SORTABLE_COLUMNS,
THUMBNAILS_SIZE: THUMBNAILS_SIZE,

View File

@ -24,15 +24,15 @@ const pods = {
getMyCertificate: getMyCertificate,
makeFriends: makeFriends,
quitFriends: quitFriends,
removeVideoToFriends: removeVideoToFriends
removeVideoToFriends: removeVideoToFriends,
sendOwnedVideosToPod: sendOwnedVideosToPod
}
function addVideoToFriends (video) {
// To avoid duplicates
const id = video.name + video.magnetUri
// ensure namePath is null
video.namePath = null
requestsScheduler.addRequest(id, 'add', video)
requestsScheduler.addRequest('add', video)
}
function hasFriends (callback) {
@ -60,7 +60,7 @@ function makeFriends (callback) {
const urls = config.get('network.friends')
async.each(urls, function (url, callbackEach) {
async.eachSeries(urls, function (url, callbackEach) {
computeForeignPodsList(url, podsScore, callbackEach)
}, function (err) {
if (err) return callback(err)
@ -78,7 +78,7 @@ function quitFriends (callback) {
// Stop pool requests
requestsScheduler.deactivate()
// Flush pool requests
requestsScheduler.forceSend()
requestsScheduler.flush()
async.waterfall([
function getPodsList (callbackAsync) {
@ -86,19 +86,25 @@ function quitFriends (callback) {
},
function announceIQuitMyFriends (pods, callbackAsync) {
const request = {
const requestParams = {
method: 'POST',
path: '/api/' + constants.API_VERSION + '/pods/remove',
sign: true,
encrypt: true,
data: {
url: 'me' // Fake data
}
sign: true
}
// Announce we quit them
requests.makeMultipleRetryRequest(request, pods, function (err) {
return callbackAsync(err)
// We don't care if the request fails
// The other pod will exclude us automatically after a while
async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
requestParams.toPod = pod
requests.makeSecureRequest(requestParams, callbackEach)
}, function (err) {
if (err) {
logger.error('Some errors while quitting friends.', { err: err })
// Don't stop the process
}
return callbackAsync()
})
},
@ -136,9 +142,28 @@ function quitFriends (callback) {
}
function removeVideoToFriends (video) {
// To avoid duplicates
const id = video.name + video.magnetUri
requestsScheduler.addRequest(id, 'remove', video)
requestsScheduler.addRequest('remove', video)
}
function sendOwnedVideosToPod (podId) {
Videos.listOwned(function (err, videosList) {
if (err) {
logger.error('Cannot get the list of videos we own.')
return
}
videosList.forEach(function (video) {
videos.convertVideoToRemote(video, function (err, remoteVideo) {
if (err) {
logger.error('Cannot convert video to remote.', { error: err })
// Don't break the process
return
}
requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo)
})
})
})
}
// ---------------------------------------------------------------------------
@ -148,18 +173,19 @@ module.exports = pods
// ---------------------------------------------------------------------------
function computeForeignPodsList (url, podsScore, callback) {
// Let's give 1 point to the pod we ask the friends list
podsScore[url] = 1
getForeignPodsList(url, function (err, foreignPodsList) {
if (err) return callback(err)
if (foreignPodsList.length === 0) return callback()
if (!foreignPodsList) foreignPodsList = []
// Let's give 1 point to the pod we ask the friends list
foreignPodsList.push({ url: url })
foreignPodsList.forEach(function (foreignPod) {
const foreignUrl = foreignPod.url
const foreignPodUrl = foreignPod.url
if (podsScore[foreignUrl]) podsScore[foreignUrl]++
else podsScore[foreignUrl] = 1
if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++
else podsScore[foreignPodUrl] = 1
})
callback()
@ -194,63 +220,43 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
// Flush pool requests
requestsScheduler.forceSend()
// Get the list of our videos to send to our new friends
Videos.listOwned(function (err, videosList) {
if (err) {
logger.error('Cannot get the list of videos we own.')
return callback(err)
}
const data = {
url: http + '://' + host + ':' + port,
publicKey: cert,
videos: videosList
}
requests.makeMultipleRetryRequest(
{ method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
podsList,
// Callback called after each request
function eachRequest (err, response, body, url, pod, callbackEachRequest) {
// We add the pod if it responded correctly with its public certificate
if (!err && response.statusCode === 200) {
Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
if (err) {
logger.error('Error with adding %s pod.', pod.url, { error: err })
return callbackEachRequest()
}
videos.createRemoteVideos(body.videos, function (err) {
if (err) {
logger.error('Error with adding videos of pod.', pod.url, { error: err })
return callbackEachRequest()
}
logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
return callbackEachRequest()
})
})
} else {
logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
return callbackEachRequest()
}
},
// Final callback, we've ended all the requests
function endRequests (err) {
// Now we made new friends, we can re activate the pool of requests
requestsScheduler.activate()
if (err) {
logger.error('There was some errors when we wanted to make friends.')
return callback(err)
}
logger.debug('makeRequestsToWinningPods finished.')
return callback(null)
async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
const params = {
url: pod.url + '/api/' + constants.API_VERSION + '/pods/',
method: 'POST',
json: {
url: http + '://' + host + ':' + port,
publicKey: cert
}
)
}
requests.makeRetryRequest(params, function (err, res, body) {
if (err) {
logger.error('Error with adding %s pod.', pod.url, { error: err })
// Don't break the process
return callbackEach()
}
if (res.statusCode === 200) {
Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) {
if (err) logger.error('Cannot add friend %s pod.', pod.url)
// Add our videos to the request scheduler
sendOwnedVideosToPod(podCreated._id)
return callbackEach()
})
} else {
logger.error('Status not 200 for %s pod.', pod.url)
return callbackEach()
}
})
}, function endRequests () {
// Final callback, we've ended all the requests
// Now we made new friends, we can re activate the pool of requests
requestsScheduler.activate()
logger.debug('makeRequestsToWinningPods finished.')
return callback()
})
}

View File

@ -11,13 +11,14 @@ const requests = require('../helpers/requests')
const videos = require('../lib/videos')
const Videos = require('../models/videos')
const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
let timer = null
const requestsScheduler = {
activate: activate,
addRequest: addRequest,
addRequestTo: addRequestTo,
deactivate: deactivate,
flush: flush,
forceSend: forceSend
}
@ -27,35 +28,37 @@ function activate () {
}
// Add request to the scheduler
function addRequest (id, type, request) {
logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
function addRequest (type, data) {
logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
Requests.findById(id, function (err, entity) {
const request = {
type: type,
data: data
}
Pods.listAllIds(function (err, podIds) {
if (err) {
logger.error('Error when trying to find a request.', { error: err })
return // Abort
logger.debug('Cannot list pod ids.')
return
}
// If there were already a request with this id in the scheduler...
if (entity) {
if (entity.type === type) {
logger.error('Cannot insert two same requests.')
return // Abort
}
// No friends
if (!podIds) return
// Remove the request of the other type
Requests.removeRequestById(id, function (err) {
if (err) {
logger.error('Cannot remove a request.', { error: err })
return // Abort
}
})
} else {
Requests.create(id, type, request, function (err) {
if (err) logger.error('Cannot create a request.', { error: err })
return // Abort
})
}
Requests.create(request, podIds, function (err) {
if (err) logger.error('Cannot create a request.', { error: err })
})
})
}
function addRequestTo (podIds, type, data) {
const request = {
type: type,
data: data
}
Requests.create(request, podIds, function (err) {
if (err) logger.error('Cannot create a request.', { error: err })
})
}
@ -64,6 +67,14 @@ function deactivate () {
clearInterval(timer)
}
function flush () {
Requests.removeAll(function (err) {
if (err) {
logger.error('Cannot flush the requests.', { error: err })
}
})
}
function forceSend () {
logger.info('Force requests scheduler sending.')
makeRequests()
@ -76,54 +87,28 @@ module.exports = requestsScheduler
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
function makeRequest (type, requestsToMake, callback) {
function makeRequest (toPod, requestsToMake, callback) {
if (!callback) callback = function () {}
Pods.list(function (err, pods) {
if (err) return callback(err)
const params = {
toPod: toPod,
encrypt: true, // Security
sign: true, // To prove our identity
method: 'POST',
path: '/api/' + constants.API_VERSION + '/remote/videos',
data: requestsToMake // Requests we need to make
}
const params = {
encrypt: true, // Security
sign: true, // To prove our identity
method: 'POST',
path: null, // We build the path later
data: requestsToMake // Requests we need to make
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
requests.makeSecureRequest(params, function (err, res) {
if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
return callback(false)
}
// If this is a valid type, we build the path
if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) {
params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type
} else {
return callback(new Error('Unkown pool request type.'))
}
const badPods = []
const goodPods = []
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
// We failed the request, add the pod unreachable to the bad pods list
if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
badPods.push(pod._id)
logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
} else {
// Request success
goodPods.push(pod._id)
}
return callbackEachPodFinished()
}
function callbackAllPodsFinished (err) {
if (err) return callback(err)
// All the requests were made, we update the pods score
updatePodsScore(goodPods, badPods)
callback(null)
}
return callback(true)
})
}
@ -143,38 +128,65 @@ function makeRequests () {
logger.info('Making requests to friends.')
// Requests by pods id
const requestsToMake = {}
for (const type of REQUEST_SCHEDULER_TYPE) {
requestsToMake[type] = {
ids: [],
requests: []
}
}
// For each requests to make, we add it to the correct request type
requests.forEach(function (poolRequest) {
if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) {
const requestTypeToMake = requestsToMake[poolRequest.type]
requestTypeToMake.requests.push(poolRequest.request)
requestTypeToMake.ids.push(poolRequest._id)
} else {
logger.error('Unkown request type.', { request_type: poolRequest.type })
return // abort
}
poolRequest.to.forEach(function (toPodId) {
if (!requestsToMake[toPodId]) {
requestsToMake[toPodId] = {
ids: [],
datas: []
}
}
requestsToMake[toPodId].ids.push(poolRequest._id)
requestsToMake[toPodId].datas.push(poolRequest.request)
})
})
for (let type of Object.keys(requestsToMake)) {
const requestTypeToMake = requestsToMake[type]
// If there are requests for this type
if (requestTypeToMake.requests.length !== 0) {
makeRequest(type, requestTypeToMake.requests, function (err) {
if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
const goodPods = []
const badPods = []
// We made the requests, so we can remove them from the scheduler
Requests.removeRequests(requestTypeToMake.ids)
async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
const requestToMake = requestsToMake[toPodId]
// FIXME: mongodb request inside a loop :/
Pods.findById(toPodId, function (err, toPod) {
if (err) return logger.error('Error finding pod by id.', { err: err })
// Maybe the pod is not our friend anymore so simply remove them
if (!toPod) {
Requests.removePodOf(requestToMake.ids, toPodId)
return callbackEach()
}
makeRequest(toPod, requestToMake.datas, function (success) {
if (err) {
logger.error('Errors when sent request to %s.', toPod.url, { error: err })
// Do not stop the process just for one error
return callbackEach()
}
if (success === true) {
logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
// Remove the pod id of these request ids
Requests.removePodOf(requestToMake.ids, toPodId)
goodPods.push(toPodId)
} else {
badPods.push(toPodId)
}
callbackEach()
})
}
}
})
}, function () {
// All the requests were made, we update the pods score
updatePodsScore(goodPods, badPods)
// Flush requests with no pod
Requests.removeWithEmptyTo()
})
})
}

View File

@ -17,6 +17,7 @@ const uploadDir = pathUtils.join(__dirname, '..', '..', config.get('storage.uplo
const thumbnailsDir = pathUtils.join(__dirname, '..', '..', config.get('storage.thumbnails'))
const videos = {
convertVideoToRemote: convertVideoToRemote,
createRemoteVideos: createRemoteVideos,
getVideoDuration: getVideoDuration,
getVideoState: getVideoState,
@ -27,6 +28,29 @@ const videos = {
seedAllExisting: seedAllExisting
}
function convertVideoToRemote (video, callback) {
fs.readFile(thumbnailsDir + video.thumbnail, function (err, thumbnailData) {
if (err) {
logger.error('Cannot read the thumbnail of the video')
return callback(err)
}
const remoteVideo = {
name: video.name,
description: video.description,
magnetUri: video.magnetUri,
author: video.author,
duration: video.duration,
thumbnailBase64: new Buffer(thumbnailData).toString('base64'),
tags: video.tags,
createdDate: video.createdDate,
podUrl: video.podUrl
}
return callback(null, remoteVideo)
})
}
function createRemoteVideos (videos, callback) {
// Create the remote videos from the new pod
createRemoteVideoObjects(videos, function (err, remoteVideos) {
@ -154,7 +178,8 @@ function createRemoteVideoObjects (videos, callback) {
podUrl: video.podUrl,
duration: video.duration,
thumbnail: thumbnailName,
tags: video.tags
tags: video.tags,
author: video.author
}
remoteVideos.push(params)

View File

@ -26,8 +26,10 @@ function makeFriends (req, res, next) {
}
function podsAdd (req, res, next) {
req.checkBody('data.url', 'Should have an url').notEmpty().isURL({ require_protocol: true })
req.checkBody('data.publicKey', 'Should have a public key').notEmpty()
req.checkBody('url', 'Should have an url').notEmpty().isURL({ require_protocol: true })
req.checkBody('publicKey', 'Should have a public key').notEmpty()
// TODO: check we don't have it already
logger.debug('Checking podsAdd parameters', { parameters: req.body })

View File

@ -4,36 +4,34 @@ const checkErrors = require('./utils').checkErrors
const logger = require('../../helpers/logger')
const reqValidatorsRemote = {
remoteVideosAdd: remoteVideosAdd,
remoteVideosRemove: remoteVideosRemove,
secureRequest: secureRequest
dataToDecrypt: dataToDecrypt,
remoteVideos: remoteVideos,
signature: signature
}
function remoteVideosAdd (req, res, next) {
function dataToDecrypt (req, res, next) {
req.checkBody('key', 'Should have a key').notEmpty()
req.checkBody('data', 'Should have data').notEmpty()
logger.debug('Checking dataToDecrypt parameters', { parameters: { keyLength: req.body.key.length, bodyLength: req.body.data.length } })
checkErrors(req, res, next)
}
function remoteVideos (req, res, next) {
req.checkBody('data').isArray()
req.checkBody('data').isEachAddRemoteVideosValid()
req.checkBody('data').isEachRemoteVideosValid()
logger.debug('Checking remoteVideosAdd parameters', { parameters: req.body })
checkErrors(req, res, next)
}
function remoteVideosRemove (req, res, next) {
req.checkBody('data').isArray()
req.checkBody('data').isEachRemoveRemoteVideosValid()
logger.debug('Checking remoteVideosRemove parameters', { parameters: req.body })
checkErrors(req, res, next)
}
function secureRequest (req, res, next) {
function signature (req, res, next) {
req.checkBody('signature.url', 'Should have a signature url').isURL()
req.checkBody('signature.signature', 'Should have a signature').notEmpty()
req.checkBody('key', 'Should have a key').notEmpty()
req.checkBody('data', 'Should have data').notEmpty()
logger.debug('Checking secureRequest parameters', { parameters: { data: req.body.data, keyLength: req.body.key.length } })
logger.debug('Checking signature parameters', { parameters: { signatureUrl: req.body.signature.url } })
checkErrors(req, res, next)
}

View File

@ -19,10 +19,13 @@ const PodsDB = mongoose.model('pods', podsSchema)
const Pods = {
add: add,
count: count,
findById: findById,
findByUrl: findByUrl,
findBadPods: findBadPods,
incrementScores: incrementScores,
list: list,
listAllIds: listAllIds,
listAllUrls: listAllUrls,
remove: remove,
removeAll: removeAll,
removeAllByIds: removeAllByIds
@ -48,6 +51,10 @@ function findBadPods (callback) {
PodsDB.find({ score: 0 }, callback)
}
function findById (id, callback) {
PodsDB.findById(id, callback)
}
function findByUrl (url, callback) {
PodsDB.findOne({ url: url }, callback)
}
@ -68,6 +75,14 @@ function list (callback) {
})
}
function listAllIds (callback) {
return PodsDB.find({}, { _id: 1 }, callback)
}
function listAllUrls (callback) {
return PodsDB.find({}, { _id: 0, url: 1 }, callback)
}
function remove (url, callback) {
if (!callback) callback = function () {}
PodsDB.remove({ url: url }, callback)

View File

@ -7,9 +7,8 @@ const logger = require('../helpers/logger')
// ---------------------------------------------------------------------------
const requestsSchema = mongoose.Schema({
type: String,
id: String, // Special id to find duplicates (video created we want to remove...)
request: mongoose.Schema.Types.Mixed
request: mongoose.Schema.Types.Mixed,
to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ]
})
const RequestsDB = mongoose.model('requests', requestsSchema)
@ -19,12 +18,15 @@ const Requests = {
create: create,
findById: findById,
list: list,
removeAll: removeAll,
removePodOf: removePodOf,
removeRequestById: removeRequestById,
removeRequests: removeRequests
removeRequests: removeRequests,
removeWithEmptyTo: removeWithEmptyTo
}
function create (id, type, request, callback) {
RequestsDB.create({ id: id, type: type, request: request }, callback)
function create (request, to, callback) {
RequestsDB.create({ request: request, to: to }, callback)
}
function findById (id, callback) {
@ -32,7 +34,17 @@ function findById (id, callback) {
}
function list (callback) {
RequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback)
RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback)
}
function removeAll (callback) {
RequestsDB.remove({ }, callback)
}
function removePodOf (requestsIds, podId, callback) {
if (!callback) callback = function () {}
RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
}
function removeRequestById (id, callback) {
@ -50,6 +62,12 @@ function removeRequests (ids) {
})
}
function removeWithEmptyTo (callback) {
if (!callback) callback = function () {}
RequestsDB.remove({ to: { $size: 0 } }, callback)
}
// ---------------------------------------------------------------------------
module.exports = Requests

View File

@ -90,33 +90,27 @@ describe('Test parameters validator', function () {
it('Should fail without public key', function (done) {
const data = {
data: {
url: 'http://coucou.com'
}
url: 'http://coucou.com'
}
makePostBodyRequest(path, data, done)
})
it('Should fail without an url', function (done) {
const data = {
data: {
publicKey: 'mysuperpublickey'
}
publicKey: 'mysuperpublickey'
}
makePostBodyRequest(path, data, done)
})
it('Should fail with an incorrect url', function (done) {
const data = {
data: {
url: 'coucou.com',
publicKey: 'mysuperpublickey'
}
url: 'coucou.com',
publicKey: 'mysuperpublickey'
}
makePostBodyRequest(path, data, function () {
data.data.url = 'http://coucou'
data.url = 'http://coucou'
makePostBodyRequest(path, data, function () {
data.data.url = 'coucou'
data.url = 'coucou'
makePostBodyRequest(path, data, done)
})
})
@ -124,10 +118,8 @@ describe('Test parameters validator', function () {
it('Should succeed with the correct parameters', function (done) {
const data = {
data: {
url: 'http://coucou.com',
publicKey: 'mysuperpublickey'
}
url: 'http://coucou.com',
publicKey: 'mysuperpublickey'
}
makePostBodyRequest(path, data, done, false)
})

View File

@ -130,6 +130,18 @@ describe('Test advanced friends', function () {
function (next) {
makeFriends(4, next)
},
// Check the pods 1, 2, 3 and 4 are friends
function (next) {
async.each([ 1, 2, 3, 4 ], function (i, callback) {
getFriendsList(i, function (err, res) {
if (err) throw err
expect(res.body.length).to.equal(3)
callback()
})
}, next)
},
// Kill pod 4
function (next) {
servers[3].app.kill()
@ -152,7 +164,7 @@ describe('Test advanced friends', function () {
uploadVideo(2, next)
},
function (next) {
setTimeout(next, 20000)
setTimeout(next, 11000)
},
// Rerun server 4
function (next) {
@ -173,6 +185,9 @@ describe('Test advanced friends', function () {
// Pod 6 ask pod 1, 2 and 3
function (next) {
makeFriends(6, next)
},
function (next) {
setTimeout(next, 11000)
}],
function (err) {
if (err) throw err
@ -247,7 +262,7 @@ describe('Test advanced friends', function () {
done()
})
}, 5000)
}, 11000)
})
})

View File

@ -25,9 +25,10 @@ describe('Test basic friends', function () {
if (err) throw err
const result = res.body
const resultUrls = [ result[0].url, result[1].url ]
expect(result).to.be.an('array')
expect(result.length).to.equal(2)
const resultUrls = [ result[0].url, result[1].url ]
expect(resultUrls[0]).to.not.equal(resultUrls[1])
const errorString = 'Friends url do not correspond for ' + serverToTest.url

View File

@ -105,6 +105,7 @@ describe('Test multiple pods', function () {
expect(video.duration).to.equal(10)
expect(video.tags).to.deep.equal([ 'tag1p1', 'tag2p1' ])
expect(utils.dateIsValid(video.createdDate)).to.be.true
expect(video.author).to.equal('root')
if (server.url !== 'http://localhost:9001') {
expect(video.isLocal).to.be.false
@ -166,6 +167,7 @@ describe('Test multiple pods', function () {
expect(video.duration).to.equal(5)
expect(video.tags).to.deep.equal([ 'tag1p2', 'tag2p2', 'tag3p2' ])
expect(utils.dateIsValid(video.createdDate)).to.be.true
expect(video.author).to.equal('root')
if (server.url !== 'http://localhost:9002') {
expect(video.isLocal).to.be.false
@ -243,6 +245,7 @@ describe('Test multiple pods', function () {
expect(video1.magnetUri).to.exist
expect(video1.duration).to.equal(5)
expect(video1.tags).to.deep.equal([ 'tag1p3' ])
expect(video1.author).to.equal('root')
expect(utils.dateIsValid(video1.createdDate)).to.be.true
expect(video2.name).to.equal('my super name for pod 3-2')
@ -251,6 +254,7 @@ describe('Test multiple pods', function () {
expect(video2.magnetUri).to.exist
expect(video2.duration).to.equal(5)
expect(video2.tags).to.deep.equal([ 'tag2p3', 'tag3p3', 'tag4p3' ])
expect(video2.author).to.equal('root')
expect(utils.dateIsValid(video2.createdDate)).to.be.true
if (server.url !== 'http://localhost:9003') {