Split models

This commit is contained in:
Chocobozzz 2016-02-04 21:10:33 +01:00
parent c45f7f8400
commit c173e56520
15 changed files with 638 additions and 517 deletions

View File

@ -2,18 +2,23 @@
'use strict'
var express = require('express')
var fs = require('fs')
var logger = require('../../../helpers/logger')
var friends = require('../../../lib/friends')
var middleware = require('../../../middlewares')
var miscMiddleware = middleware.misc
var pods = require('../../../models/pods')
var Pods = require('../../../models/pods')
var reqValidator = middleware.reqValidators.pods
var secureRequest = middleware.reqValidators.remote.secureRequest
var utils = require('../../../helpers/utils')
var Videos = require('../../../models/videos')
var router = express.Router()
router.get('/', miscMiddleware.cache(false), listPods)
router.post('/', reqValidator.podsAdd, miscMiddleware.cache(false), addPods)
router.get('/makefriends', miscMiddleware.cache(false), makeFriends)
router.get('/makefriends', reqValidator.makeFriends, miscMiddleware.cache(false), makeFriends)
router.get('/quitfriends', miscMiddleware.cache(false), quitFriends)
// Post because this is a secured request
router.post('/remove', secureRequest, miscMiddleware.decryptBody, removePods)
@ -25,15 +30,32 @@
// ---------------------------------------------------------------------------
function addPods (req, res, next) {
pods.add(req.body.data, function (err, json) {
var informations = req.body.data
Pods.add(informations, function (err) {
if (err) return next(err)
res.json(json)
Videos.addRemotes(informations.videos)
fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) {
if (err) {
logger.error('Cannot read cert file.', { error: err })
return next(err)
}
Videos.listOwned(function (err, videos_list) {
if (err) {
logger.error('Cannot get the list of owned videos.', { error: err })
return next(err)
}
res.json({ cert: cert, videos: videos_list })
})
})
})
}
function listPods (req, res, next) {
pods.list(function (err, pods_list) {
Pods.list(function (err, pods_list) {
if (err) return next(err)
res.json(pods_list)
@ -41,32 +63,28 @@
}
function makeFriends (req, res, next) {
pods.hasFriends(function (err, has_friends) {
if (err) return next(err)
if (has_friends === true) {
// We need to quit our friends before make new ones
res.sendStatus(409)
} else {
pods.makeFriends(function (err) {
if (err) return next(err)
res.sendStatus(204)
})
}
})
}
function removePods (req, res, next) {
pods.remove(req.body.signature.url, function (err) {
friends.makeFriends(function (err) {
if (err) return next(err)
res.sendStatus(204)
})
}
function removePods (req, res, next) {
var url = req.body.signature.url
Pods.remove(url, function (err) {
if (err) return next(err)
Videos.removeAllRemotesOf(url, function (err) {
if (err) logger.error('Cannot remove all remote videos of %s.', url)
logger.info('%s pod removed.', url)
res.sendStatus(204)
})
})
}
function quitFriends (req, res, next) {
pods.quitFriends(function (err) {
friends.quitFriends(function (err) {
if (err) return next(err)
res.sendStatus(204)

View File

@ -42,7 +42,10 @@
}
function removeRemoteVideo (req, res, next) {
videos.removeRemotes(req.body.signature.url, pluck(req.body.data, 'magnetUri'), function (err) {
var url = req.body.signature.url
var magnetUris = pluck(req.body.data, 'magnetUri')
videos.removeRemotesOfByMagnetUris(url, magnetUris, function (err) {
if (err) return next(err)
res.sendStatus(204)

View File

@ -6,10 +6,14 @@
var express = require('express')
var multer = require('multer')
var logger = require('../../../helpers/logger')
var friends = require('../../../lib/friends')
var middleware = require('../../../middlewares')
var miscMiddleware = middleware.misc
var reqValidator = middleware.reqValidators.videos
var videos = require('../../../models/videos')
var Videos = require('../../../models/videos') // model
var videos = require('../../../lib/videos')
var webtorrent = require('../../../lib/webTorrentNode')
var router = express.Router()
var uploads = config.get('storage.uploads')
@ -35,7 +39,7 @@
var reqFiles = multer({ storage: storage }).fields([{ name: 'input_video', maxCount: 1 }])
router.get('/', miscMiddleware.cache(false), listVideos)
router.post('/', reqFiles, reqValidator.videosAdd, miscMiddleware.cache(false), addVideos)
router.post('/', reqFiles, reqValidator.videosAdd, miscMiddleware.cache(false), addVideo)
router.get('/:id', reqValidator.videosGet, miscMiddleware.cache(false), getVideos)
router.delete('/:id', reqValidator.videosRemove, miscMiddleware.cache(false), removeVideo)
router.get('/search/:name', reqValidator.videosSearch, miscMiddleware.cache(false), searchVideos)
@ -46,17 +50,41 @@
// ---------------------------------------------------------------------------
function addVideos (req, res, next) {
videos.add({ video: req.files.input_video[0], data: req.body }, function (err) {
if (err) return next(err)
function addVideo (req, res, next) {
var video_file = req.files.input_video[0]
var video_infos = req.body
// TODO : include Location of the new video
res.sendStatus(201)
videos.seed(video_file.path, function (err, torrent) {
if (err) {
logger.error('Cannot seed this video.', { error: err })
return next(err)
}
var video_data = {
name: video_infos.name,
namePath: video_file.filename,
description: video_infos.description,
magnetUri: torrent.magnetURI
}
Videos.add(video_data, function (err) {
if (err) {
// TODO unseed the video
logger.error('Cannot insert this video in the database.', { error: err })
return next(err)
}
// Now we'll add the video's meta data to our friends
friends.addVideoToFriends(video_data)
// TODO : include Location of the new video
res.sendStatus(201)
})
})
}
function getVideos (req, res, next) {
videos.get(req.params.id, function (err, video) {
Videos.get(req.params.id, function (err, video) {
if (err) return next(err)
if (video === null) {
@ -68,7 +96,7 @@
}
function listVideos (req, res, next) {
videos.list(function (err, videos_list) {
Videos.list(function (err, videos_list) {
if (err) return next(err)
res.json(videos_list)
@ -76,18 +104,43 @@
}
function removeVideo (req, res, next) {
videos.remove(req.params.id, function (err) {
var video_id = req.params.id
Videos.get(video_id, function (err, video) {
if (err) return next(err)
res.sendStatus(204)
removeTorrent(video.magnetUri, function () {
Videos.removeOwned(req.params.id, function (err) {
if (err) return next(err)
var params = {
name: video.name,
magnetUri: video.magnetUri
}
friends.removeVideoToFriends(params)
res.sendStatus(204)
})
})
})
}
function searchVideos (req, res, next) {
videos.search(req.params.name, function (err, videos_list) {
Videos.search(req.params.name, function (err, videos_list) {
if (err) return next(err)
res.json(videos_list)
})
}
// ---------------------------------------------------------------------------
// Maybe the torrent is not seeded, but we catch the error to don't stop the removing process
function removeTorrent (magnetUri, callback) {
try {
webtorrent.remove(magnetUri, callback)
} catch (err) {
logger.warn('Cannot remove the torrent from WebTorrent', { err: err })
return callback(null)
}
}
})()

View File

@ -4,59 +4,29 @@
var config = require('config')
var mongoose = require('mongoose')
var constants = require('./constants')
var logger = require('../helpers/logger')
var dbname = 'peertube' + config.get('database.suffix')
var host = config.get('database.host')
var port = config.get('database.port')
// ----------- Pods -----------
var podsSchema = mongoose.Schema({
url: String,
publicKey: String,
score: { type: Number, max: constants.FRIEND_BASE_SCORE }
})
var database = {
connect: connect
}
var PodsDB = mongoose.model('pods', podsSchema)
function connect () {
mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname)
mongoose.connection.on('error', function () {
logger.error('Mongodb connection error.')
process.exit(0)
})
// ----------- PoolRequests -----------
var poolRequestsSchema = mongoose.Schema({
type: String,
id: String, // Special id to find duplicates (video created we want to remove...)
request: mongoose.Schema.Types.Mixed
})
var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema)
// ----------- Videos -----------
var videosSchema = mongoose.Schema({
name: String,
namePath: String,
description: String,
magnetUri: String,
podUrl: String
})
var VideosDB = mongoose.model('videos', videosSchema)
mongoose.connection.on('open', function () {
logger.info('Connected to mongodb.')
})
}
// ---------------------------------------------------------------------------
module.exports = {
PodsDB: PodsDB,
PoolRequestsDB: PoolRequestsDB,
VideosDB: VideosDB
}
// ----------- Connection -----------
mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname)
mongoose.connection.on('error', function () {
logger.error('Mongodb connection error.')
process.exit(0)
})
mongoose.connection.on('open', function () {
logger.info('Connected to mongodb.')
})
module.exports = database
})()

218
lib/friends.js Normal file
View File

@ -0,0 +1,218 @@
;(function () {
'use strict'
var async = require('async')
var config = require('config')
var fs = require('fs')
var request = require('request')
var constants = require('../initializers/constants')
var logger = require('../helpers/logger')
var Pods = require('../models/pods')
var PoolRequests = require('../models/poolRequests')
var poolRequests = require('../lib/poolRequests')
var utils = require('../helpers/utils')
var Videos = require('../models/videos')
var http = config.get('webserver.https') ? 'https' : 'http'
var host = config.get('webserver.host')
var port = config.get('webserver.port')
var pods = {
addVideoToFriends: addVideoToFriends,
hasFriends: hasFriends,
makeFriends: makeFriends,
quitFriends: quitFriends,
removeVideoToFriends: removeVideoToFriends
}
function addVideoToFriends (video) {
// To avoid duplicates
var id = video.name + video.magnetUri
// namePath is null
// TODO
video.namePath = null
PoolRequests.addRequest(id, 'add', video)
}
function hasFriends (callback) {
Pods.count(function (err, count) {
if (err) return callback(err)
var has_friends = (count !== 0)
callback(null, has_friends)
})
}
function makeFriends (callback) {
var pods_score = {}
logger.info('Make friends!')
fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) {
if (err) {
logger.error('Cannot read public cert.', { error: err })
return callback(err)
}
var urls = config.get('network.friends')
async.each(urls, computeForeignPodsList, function () {
logger.debug('Pods scores computed.', { pods_score: pods_score })
var pods_list = computeWinningPods(urls, pods_score)
logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list })
makeRequestsToWinningPods(cert, pods_list)
})
})
// -----------------------------------------------------------------------
function computeForeignPodsList (url, callback) {
// Let's give 1 point to the pod we ask the friends list
pods_score[url] = 1
getForeignPodsList(url, function (foreign_pods_list) {
if (foreign_pods_list.length === 0) return callback()
async.each(foreign_pods_list, function (foreign_pod, callback_each) {
var foreign_url = foreign_pod.url
if (pods_score[foreign_url]) pods_score[foreign_url]++
else pods_score[foreign_url] = 1
callback_each()
}, function () {
callback()
})
})
}
function computeWinningPods (urls, pods_score) {
// Build the list of pods to add
// Only add a pod if it exists in more than a half base pods
var pods_list = []
var base_score = urls.length / 2
Object.keys(pods_score).forEach(function (pod) {
if (pods_score[pod] > base_score) pods_list.push({ url: pod })
})
return pods_list
}
function makeRequestsToWinningPods (cert, pods_list) {
// Stop pool requests
poolRequests.deactivate()
// Flush pool requests
poolRequests.forceSend()
// Get the list of our videos to send to our new friends
Videos.listOwned(function (err, videos_list) {
if (err) throw err
var data = {
url: http + '://' + host + ':' + port,
publicKey: cert,
videos: videos_list
}
utils.makeMultipleRetryRequest(
{ method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
pods_list,
function eachRequest (err, response, body, url, pod, callback_each_request) {
// We add the pod if it responded correctly with its public certificate
if (!err && response.statusCode === 200) {
Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
if (err) logger.error('Error with adding %s pod.', pod.url, { error: err })
Videos.addRemotes(body.videos, function (err) {
if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err })
logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
return callback_each_request()
})
})
} else {
logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
return callback_each_request()
}
},
function endRequests (err) {
// Now we made new friends, we can re activate the pool of requests
poolRequests.activate()
if (err) {
logger.error('There was some errors when we wanted to make friends.', { error: err })
return callback(err)
}
logger.debug('makeRequestsToWinningPods finished.')
return callback(null)
}
)
})
}
}
function quitFriends (callback) {
// Stop pool requests
poolRequests.deactivate()
// Flush pool requests
poolRequests.forceSend()
Pods.list(function (err, pods) {
if (err) return callback(err)
var request = {
method: 'POST',
path: '/api/' + constants.API_VERSION + '/pods/remove',
sign: true,
encrypt: true,
data: {
url: 'me' // Fake data
}
}
// Announce we quit them
utils.makeMultipleRetryRequest(request, pods, function () {
Pods.removeAll(function (err) {
poolRequests.activate()
if (err) return callback(err)
logger.info('Broke friends, so sad :(')
Videos.removeAllRemotes(function (err) {
if (err) return callback(err)
logger.info('Removed all remote videos.')
callback(null)
})
})
})
})
}
function removeVideoToFriends (video) {
// To avoid duplicates
var id = video.name + video.magnetUri
PoolRequests.addRequest(id, 'remove', video)
}
// ---------------------------------------------------------------------------
module.exports = pods
// ---------------------------------------------------------------------------
function getForeignPodsList (url, callback) {
var path = '/api/' + constants.API_VERSION + '/pods'
request.get(url + path, function (err, response, body) {
if (err) throw err
callback(JSON.parse(body))
})
}
})()

View File

@ -5,18 +5,16 @@
var pluck = require('lodash-node/compat/collection/pluck')
var constants = require('../initializers/constants')
var database = require('../initializers/database')
var logger = require('../helpers/logger')
var PodsDB = database.PodsDB
var PoolRequestsDB = database.PoolRequestsDB
var Pods = require('../models/pods')
var PoolRequests = require('../models/poolRequests')
var utils = require('../helpers/utils')
var VideosDB = database.VideosDB
var Videos = require('../models/videos')
var timer = null
var poolRequests = {
activate: activate,
addToPoolRequests: addToPoolRequests,
deactivate: deactivate,
forceSend: forceSend
}
@ -36,30 +34,6 @@
timer = setInterval(makePoolRequests, constants.INTERVAL)
}
function addToPoolRequests (id, type, request) {
logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
PoolRequestsDB.findOne({ id: id }, function (err, entity) {
if (err) logger.error(err)
if (entity) {
if (entity.type === type) {
logger.error(new Error('Cannot insert two same requests.'))
return
}
// Remove the request of the other type
PoolRequestsDB.remove({ id: id }, function (err) {
if (err) logger.error(err)
})
} else {
PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
if (err) logger.error(err)
})
}
})
}
// ---------------------------------------------------------------------------
module.exports = poolRequests
@ -69,7 +43,7 @@
function makePoolRequest (type, requests, callback) {
if (!callback) callback = function () {}
PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
Pods.list(function (err, pods) {
if (err) throw err
var params = {
@ -116,7 +90,7 @@
function makePoolRequests () {
logger.info('Making pool requests to friends.')
PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) {
PoolRequests.list(function (err, pool_requests) {
if (err) throw err
if (pool_requests.length === 0) return
@ -150,7 +124,7 @@
makePoolRequest('add', requests.add.requests, function (err) {
if (err) logger.error('Errors when sent add pool requests.', { error: err })
removePoolRequestsFromDB(requests.add.ids)
PoolRequests.removeRequests(requests.add.ids)
})
}
@ -159,7 +133,7 @@
makePoolRequest('remove', requests.remove.requests, function (err) {
if (err) logger.error('Errors when sent remove pool requests.', { error: err })
removePoolRequestsFromDB(requests.remove.ids)
PoolRequests.removeRequests(requests.remove.ids)
})
}
})
@ -167,7 +141,7 @@
}
function removeBadPods () {
PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) {
Pods.findBadPods(function (err, pods) {
if (err) throw err
if (pods.length === 0) return
@ -175,12 +149,12 @@
var urls = pluck(pods, 'url')
var ids = pluck(pods, '_id')
VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) {
Videos.removeAllRemotesOf(urls, function (err, r) {
if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err })
var videos_removed = r.result.n
logger.info('Removed %d videos.', videos_removed)
PodsDB.remove({ _id: { $in: ids } }, function (err, r) {
Pods.removeAllByIds(ids, function (err, r) {
if (err) logger.error('Cannot remove bad pods.', { error: err })
var pods_removed = r.result.n
@ -190,22 +164,11 @@
})
}
function removePoolRequestsFromDB (ids) {
PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) {
if (err) {
logger.error('Cannot remove requests from the pool requests database.', { error: err })
return
}
logger.info('Pool requests flushed.')
})
}
function updatePodsScore (good_pods, bad_pods) {
logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec()
PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) {
Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS)
Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
if (err) throw err
removeBadPods()
})

51
lib/videos.js Normal file
View File

@ -0,0 +1,51 @@
;(function () {
'use strict'
var async = require('async')
var config = require('config')
var webtorrent = require('../lib/webTorrentNode')
var logger = require('../helpers/logger')
var Videos = require('../models/videos')
var uploadDir = __dirname + '/../' + config.get('storage.uploads')
var videos = {
seed: seed,
seedAllExisting: seedAllExisting
}
function seed (path, callback) {
logger.info('Seeding %s...', path)
webtorrent.seed(path, function (torrent) {
logger.info('%s seeded (%s).', path, torrent.magnetURI)
return callback(null, torrent)
})
}
function seedAllExisting (callback) {
Videos.listOwned(function (err, videos_list) {
if (err) {
logger.error('Cannot get list of the videos to seed.', { error: err })
return callback(err)
}
async.each(videos_list, function (video, each_callback) {
seed(uploadDir + video.namePath, function (err) {
if (err) {
logger.error('Cannot seed this video.', { error: err })
return callback(err)
}
each_callback(null)
})
}, callback)
})
}
// ---------------------------------------------------------------------------
module.exports = videos
})()

View File

@ -62,7 +62,7 @@
try {
wt.remove(magnetUri, callback)
} catch (err) {
console.log('Cannot remove the torrent from WebTorrent', { err: err })
console.log('Cannot remove the torrent from WebTorrent')
return callback(null)
}

View File

@ -5,7 +5,7 @@
var ursa = require('ursa')
var logger = require('../helpers/logger')
var PodsDB = require('../initializers/database').PodsDB
var Pods = require('../models/pods')
var utils = require('../helpers/utils')
var miscMiddleware = {
@ -28,18 +28,19 @@
}
function decryptBody (req, res, next) {
PodsDB.findOne({ url: req.body.signature.url }, function (err, pod) {
var url = req.body.signature.url
Pods.findByUrl(url, function (err, pod) {
if (err) {
logger.error('Cannot get signed url in decryptBody.', { error: err })
return res.sendStatus(500)
}
if (pod === null) {
logger.error('Unknown pod %s.', req.body.signature.url)
logger.error('Unknown pod %s.', url)
return res.sendStatus(403)
}
logger.debug('Decrypting body from %s.', req.body.signature.url)
logger.debug('Decrypting body from %s.', url)
var crt = ursa.createPublicKey(pod.publicKey)
var signature_ok = crt.hashAndVerify('sha256', new Buffer(req.body.signature.url).toString('hex'), req.body.signature.signature, 'hex')

View File

@ -2,12 +2,27 @@
'use strict'
var checkErrors = require('./utils').checkErrors
var friends = require('../../lib/friends')
var logger = require('../../helpers/logger')
var reqValidatorsPod = {
makeFriends: makeFriends,
podsAdd: podsAdd
}
function makeFriends (req, res, next) {
friends.hasFriends(function (err, has_friends) {
if (err) return next(err)
if (has_friends === true) {
// We need to quit our friends before make new ones
res.sendStatus(409)
} else {
next()
}
})
}
function podsAdd (req, res, next) {
req.checkBody('data.url', 'Should have an url').notEmpty().isURL({ require_protocol: true })
req.checkBody('data.publicKey', 'Should have a public key').notEmpty()

View File

@ -3,7 +3,7 @@
var checkErrors = require('./utils').checkErrors
var logger = require('../../helpers/logger')
var VideosDB = require('../../initializers/database').VideosDB
var Videos = require('../../models/videos')
var reqValidatorsVideos = {
videosAdd: videosAdd,
@ -29,8 +29,13 @@
logger.debug('Checking videosGet parameters', { parameters: req.params })
checkErrors(req, res, function () {
findVideoById(req.params.id, function (video) {
if (!video) return res.status(404).send('Video not found')
Videos.getVideoState(req.params.id, function (err, state) {
if (err) {
logger.error('Error in videosGet request validator.', { error: err })
res.sendStatus(500)
}
if (state.exist === false) return res.status(404).send('Video not found')
next()
})
@ -43,9 +48,14 @@
logger.debug('Checking videosRemove parameters', { parameters: req.params })
checkErrors(req, res, function () {
findVideoById(req.params.id, function (video) {
if (!video) return res.status(404).send('Video not found')
else if (video.namePath === null) return res.status(403).send('Cannot remove video of another pod')
Videos.getVideoState(req.params.id, function (err, state) {
if (err) {
logger.error('Error in videosRemove request validator.', { error: err })
res.sendStatus(500)
}
if (state.exist === false) return res.status(404).send('Video not found')
else if (state.owned === false) return res.status(403).send('Cannot remove video of another pod')
next()
})
@ -63,14 +73,4 @@
// ---------------------------------------------------------------------------
module.exports = reqValidatorsVideos
// ---------------------------------------------------------------------------
function findVideoById (id, callback) {
VideosDB.findById(id, { _id: 1, namePath: 1 }).limit(1).exec(function (err, video) {
if (err) throw err
callback(video)
})
}
})()

View File

@ -1,73 +1,61 @@
;(function () {
'use strict'
var async = require('async')
var config = require('config')
var fs = require('fs')
var request = require('request')
var mongoose = require('mongoose')
var constants = require('../initializers/constants')
var logger = require('../helpers/logger')
var PodsDB = require('../initializers/database').PodsDB
var poolRequests = require('../lib/poolRequests')
var utils = require('../helpers/utils')
var http = config.get('webserver.https') ? 'https' : 'http'
var host = config.get('webserver.host')
var port = config.get('webserver.port')
// ---------------------------------------------------------------------------
var pods = {
var podsSchema = mongoose.Schema({
url: String,
publicKey: String,
score: { type: Number, max: constants.FRIEND_BASE_SCORE }
})
var PodsDB = mongoose.model('pods', podsSchema)
// ---------------------------------------------------------------------------
var Pods = {
add: add,
addVideoToFriends: addVideoToFriends,
count: count,
findByUrl: findByUrl,
findBadPods: findBadPods,
incrementScores: incrementScores,
list: list,
hasFriends: hasFriends,
makeFriends: makeFriends,
quitFriends: quitFriends,
remove: remove,
removeVideoToFriends
removeAll: removeAll,
removeAllByIds: removeAllByIds
}
// TODO: check if the pod is not already a friend
function add (data, callback) {
var videos = require('./videos')
logger.info('Adding pod: %s', data.url)
if (!callback) callback = function () {}
var params = {
url: data.url,
publicKey: data.publicKey,
score: constants.FRIEND_BASE_SCORE
}
PodsDB.create(params, function (err, pod) {
if (err) {
logger.error('Cannot insert the pod.', { error: err })
return callback(err)
}
videos.addRemotes(data.videos)
fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) {
if (err) {
logger.error('Cannot read cert file.', { error: err })
return callback(err)
}
videos.listOwned(function (err, videos_list) {
if (err) {
logger.error('Cannot get the list of owned videos.', { error: err })
return callback(err)
}
return callback(null, { cert: cert, videos: videos_list })
})
})
})
PodsDB.create(params, callback)
}
function addVideoToFriends (video) {
// To avoid duplicates
var id = video.name + video.magnetUri
poolRequests.addToPoolRequests(id, 'add', video)
function count (callback) {
return PodsDB.count(callback)
}
function findBadPods (callback) {
PodsDB.find({ score: 0 }, callback)
}
function findByUrl (url, callback) {
PodsDB.findOne({ url: url }, callback)
}
function incrementScores (ids, value, callback) {
if (!callback) callback = function () {}
PodsDB.update({ _id: { $in: ids } }, { $inc: { score: value } }, { multi: true }, callback)
}
function list (callback) {
@ -81,202 +69,22 @@
})
}
function hasFriends (callback) {
PodsDB.count(function (err, count) {
if (err) return callback(err)
var has_friends = (count !== 0)
callback(null, has_friends)
})
}
function makeFriends (callback) {
var videos = require('./videos')
var pods_score = {}
logger.info('Make friends!')
fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) {
if (err) {
logger.error('Cannot read public cert.', { error: err })
return callback(err)
}
var urls = config.get('network.friends')
async.each(urls, computeForeignPodsList, function () {
logger.debug('Pods scores computed.', { pods_score: pods_score })
var pods_list = computeWinningPods(urls, pods_score)
logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list })
makeRequestsToWinningPods(cert, pods_list)
})
})
// -----------------------------------------------------------------------
function computeForeignPodsList (url, callback) {
// Let's give 1 point to the pod we ask the friends list
pods_score[url] = 1
getForeignPodsList(url, function (foreign_pods_list) {
if (foreign_pods_list.length === 0) return callback()
async.each(foreign_pods_list, function (foreign_pod, callback_each) {
var foreign_url = foreign_pod.url
if (pods_score[foreign_url]) pods_score[foreign_url]++
else pods_score[foreign_url] = 1
callback_each()
}, function () {
callback()
})
})
}
function computeWinningPods (urls, pods_score) {
// Build the list of pods to add
// Only add a pod if it exists in more than a half base pods
var pods_list = []
var base_score = urls.length / 2
Object.keys(pods_score).forEach(function (pod) {
if (pods_score[pod] > base_score) pods_list.push({ url: pod })
})
return pods_list
}
function makeRequestsToWinningPods (cert, pods_list) {
// Stop pool requests
poolRequests.deactivate()
// Flush pool requests
poolRequests.forceSend()
// Get the list of our videos to send to our new friends
videos.listOwned(function (err, videos_list) {
if (err) throw err
var data = {
url: http + '://' + host + ':' + port,
publicKey: cert,
videos: videos_list
}
utils.makeMultipleRetryRequest(
{ method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
pods_list,
function eachRequest (err, response, body, url, pod, callback_each_request) {
// We add the pod if it responded correctly with its public certificate
if (!err && response.statusCode === 200) {
add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
if (err) logger.error('Error with adding %s pod.', pod.url, { error: err })
videos.addRemotes(body.videos, function (err) {
if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err })
logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
return callback_each_request()
})
})
} else {
logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
return callback_each_request()
}
},
function endRequests (err) {
// Now we made new friends, we can re activate the pool of requests
poolRequests.activate()
if (err) {
logger.error('There was some errors when we wanted to make friends.', { error: err })
return callback(err)
}
logger.debug('makeRequestsToWinningPods finished.')
return callback(null)
}
)
})
}
}
function quitFriends (callback) {
// Stop pool requests
poolRequests.deactivate()
// Flush pool requests
poolRequests.forceSend()
PodsDB.find(function (err, pods) {
if (err) return callback(err)
var request = {
method: 'POST',
path: '/api/' + constants.API_VERSION + '/pods/remove',
sign: true,
encrypt: true,
data: {
url: 'me' // Fake data
}
}
// Announce we quit them
utils.makeMultipleRetryRequest(request, pods, function () {
PodsDB.remove(function (err) {
poolRequests.activate()
if (err) return callback(err)
logger.info('Broke friends, so sad :(')
var videos = require('./videos')
videos.removeAllRemotes(function (err) {
if (err) return callback(err)
logger.info('Removed all remote videos.')
callback(null)
})
})
})
})
}
function remove (url, callback) {
var videos = require('./videos')
logger.info('Removing %s pod.', url)
videos.removeAllRemotesOf(url, function (err) {
if (err) logger.error('Cannot remove all remote videos of %s.', url)
PodsDB.remove({ url: url }, function (err) {
if (err) return callback(err)
logger.info('%s pod removed.', url)
callback(null)
})
})
if (!callback) callback = function () {}
PodsDB.remove({ url: url }, callback)
}
function removeVideoToFriends (video) {
// To avoid duplicates
var id = video.name + video.magnetUri
poolRequests.addToPoolRequests(id, 'remove', video)
function removeAll (callback) {
if (!callback) callback = function () {}
PodsDB.remove(callback)
}
function removeAllByIds (ids, callback) {
if (!callback) callback = function () {}
PodsDB.remove({ _id: { $in: ids } }, callback)
}
// ---------------------------------------------------------------------------
module.exports = pods
// ---------------------------------------------------------------------------
function getForeignPodsList (url, callback) {
var path = '/api/' + constants.API_VERSION + '/pods'
request.get(url + path, function (err, response, body) {
if (err) throw err
callback(JSON.parse(body))
})
}
module.exports = Pods
})()

67
models/poolRequests.js Normal file
View File

@ -0,0 +1,67 @@
;(function () {
'use strict'
var mongoose = require('mongoose')
var logger = require('../helpers/logger')
// ---------------------------------------------------------------------------
var poolRequestsSchema = mongoose.Schema({
type: String,
id: String, // Special id to find duplicates (video created we want to remove...)
request: mongoose.Schema.Types.Mixed
})
var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema)
// ---------------------------------------------------------------------------
var PoolRequests = {
addRequest: addRequest,
list: list,
removeRequests: removeRequests
}
function addRequest (id, type, request) {
logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
PoolRequestsDB.findOne({ id: id }, function (err, entity) {
if (err) logger.error(err)
if (entity) {
if (entity.type === type) {
logger.error(new Error('Cannot insert two same requests.'))
return
}
// Remove the request of the other type
PoolRequestsDB.remove({ id: id }, function (err) {
if (err) logger.error(err)
})
} else {
PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
if (err) logger.error(err)
})
}
})
}
function list (callback) {
PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback)
}
function removeRequests (ids) {
PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) {
if (err) {
logger.error('Cannot remove requests from the pool requests database.', { error: err })
return
}
logger.info('Pool requests flushed.')
})
}
// ---------------------------------------------------------------------------
module.exports = PoolRequests
})()

View File

@ -5,71 +5,62 @@
var config = require('config')
var dz = require('dezalgo')
var fs = require('fs')
var webtorrent = require('../lib/webTorrentNode')
var mongoose = require('mongoose')
var logger = require('../helpers/logger')
var pods = require('./pods')
var VideosDB = require('../initializers/database').VideosDB
var http = config.get('webserver.https') === true ? 'https' : 'http'
var host = config.get('webserver.host')
var port = config.get('webserver.port')
var uploadDir = __dirname + '/../' + config.get('storage.uploads')
var videos = {
// ---------------------------------------------------------------------------
var videosSchema = mongoose.Schema({
name: String,
namePath: String,
description: String,
magnetUri: String,
podUrl: String
})
var VideosDB = mongoose.model('videos', videosSchema)
// ---------------------------------------------------------------------------
var Videos = {
add: add,
addRemotes: addRemotes,
get: get,
getVideoState: getVideoState,
isOwned: isOwned,
list: list,
listOwned: listOwned,
remove: remove,
removeOwned: removeOwned,
removeAllRemotes: removeAllRemotes,
removeAllRemotesOf: removeAllRemotesOf,
removeRemotes: removeRemotes,
search: search,
seedAll: seedAll,
uploadDir: uploadDir
removeRemotesOfByMagnetUris: removeRemotesOfByMagnetUris,
search: search
}
// ----------- Public attributes ----------
var uploadDir = __dirname + '/../' + config.get('storage.uploads')
function add (video, callback) {
logger.info('Adding %s video to database.', video.name)
function add (data, callback) {
var video_file = data.video
var video_data = data.data
var params = video
params.podUrl = http + '://' + host + ':' + port
logger.info('Adding %s video.', video_file.path)
seedVideo(video_file.path, function (err, torrent) {
VideosDB.create(params, function (err, video) {
if (err) {
logger.error('Cannot seed this video.', { error: err })
logger.error('Cannot insert this video into database.', { error: err })
return callback(err)
}
var params = {
name: video_data.name,
namePath: video_file.filename,
description: video_data.description,
magnetUri: torrent.magnetURI,
podUrl: http + '://' + host + ':' + port
}
VideosDB.create(params, function (err, video) {
if (err) {
logger.error('Cannot insert this video.', { error: err })
return callback(err)
}
// Now we'll add the video's meta data to our friends
params.namePath = null
pods.addVideoToFriends(params)
callback(null)
})
callback(null)
})
}
// TODO: avoid doublons
function addRemotes (videos, callback) {
if (callback === undefined) callback = function () {}
if (!callback) callback = function () {}
var to_add = []
@ -111,6 +102,38 @@
})
}
function getVideoState (id, callback) {
get(id, function (err, video) {
if (err) return callback(err)
var exist = (video !== null)
var owned = false
if (exist === true) {
owned = (video.namePath !== null)
}
return callback(null, { exist: exist, owned: owned })
})
}
function isOwned (id, callback) {
VideosDB.findById(id, function (err, video) {
if (err || !video) {
if (!err) err = new Error('Cannot find this video.')
logger.error('Cannot find this video.', { error: err })
return callback(err)
}
if (video.namePath === null) {
var error_string = 'Cannot remove the video of another pod.'
logger.error(error_string)
return callback(null, false, video)
}
callback(null, true, video)
})
}
function list (callback) {
VideosDB.find(function (err, videos_list) {
if (err) {
@ -134,76 +157,35 @@
})
}
function remove (id, callback) {
// Maybe the torrent is not seeded, but we catch the error to don't stop the removing process
function removeTorrent (magnetUri, callback) {
try {
webtorrent.remove(magnetUri, callback)
} catch (err) {
logger.warn('Cannot remove the torrent from WebTorrent', { err: err })
return callback(null)
}
}
VideosDB.findById(id, function (err, video) {
if (err || !video) {
if (!err) err = new Error('Cannot find this video.')
logger.error('Cannot find this video.', { error: err })
function removeOwned (id, callback) {
VideosDB.findByIdAndRemove(id, function (err, video) {
if (err) {
logger.error('Cannot remove the torrent.', { error: err })
return callback(err)
}
if (video.namePath === null) {
var error_string = 'Cannot remove the video of another pod.'
logger.error(error_string)
return callback(new Error(error_string))
}
fs.unlink(uploadDir + video.namePath, function (err) {
if (err) {
logger.error('Cannot remove this video file.', { error: err })
return callback(err)
}
logger.info('Removing %s video', video.name)
removeTorrent(video.magnetUri, function () {
VideosDB.findByIdAndRemove(id, function (err) {
if (err) {
logger.error('Cannot remove the torrent.', { error: err })
return callback(err)
}
fs.unlink(uploadDir + video.namePath, function (err) {
if (err) {
logger.error('Cannot remove this video file.', { error: err })
return callback(err)
}
var params = {
name: video.name,
magnetUri: video.magnetUri
}
pods.removeVideoToFriends(params)
callback(null)
})
})
callback(null)
})
})
}
function removeAllRemotes (callback) {
VideosDB.remove({ namePath: null }, function (err) {
if (err) return callback(err)
callback(null)
})
VideosDB.remove({ namePath: null }, callback)
}
function removeAllRemotesOf (fromUrl, callback) {
VideosDB.remove({ podUrl: fromUrl }, function (err) {
if (err) return callback(err)
callback(null)
})
// TODO { podUrl: { $in: urls } }
VideosDB.remove({ podUrl: fromUrl }, callback)
}
// Use the magnet Uri because the _id field is not the same on different servers
function removeRemotes (fromUrl, magnetUris, callback) {
function removeRemotesOfByMagnetUris (fromUrl, magnetUris, callback) {
if (callback === undefined) callback = function () {}
VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) {
@ -248,39 +230,7 @@
})
}
function seedAll (callback) {
VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) {
if (err) {
logger.error('Cannot get list of the videos to seed.', { error: err })
return callback(err)
}
async.each(videos_list, function (video, each_callback) {
seedVideo(uploadDir + video.namePath, function (err) {
if (err) {
logger.error('Cannot seed this video.', { error: err })
return callback(err)
}
each_callback(null)
})
}, callback)
})
}
// ---------------------------------------------------------------------------
module.exports = videos
// ---------------------------------------------------------------------------
function seedVideo (path, callback) {
logger.info('Seeding %s...', path)
webtorrent.seed(path, function (torrent) {
logger.info('%s seeded (%s).', path, torrent.magnetURI)
return callback(null, torrent)
})
}
module.exports = Videos
})()

View File

@ -30,16 +30,20 @@
var config = require('config')
var constants = require('./initializers/constants')
var customValidators = require('./helpers/customValidators')
var database = require('./initializers/database')
var logger = require('./helpers/logger')
var poolRequests = require('./lib/poolRequests')
var routes = require('./controllers')
var utils = require('./helpers/utils')
var videos = require('./models/videos')
var videos = require('./lib/videos')
var webtorrent = require('./lib/webTorrentNode')
// Get configurations
var port = config.get('listen.port')
// ----------- Database -----------
database.connect()
// ----------- Command line -----------
// ----------- App -----------
@ -153,7 +157,7 @@
// Activate the pool requests
poolRequests.activate()
videos.seedAll(function () {
videos.seedAllExisting(function () {
logger.info('Seeded all the videos')
logger.info('Server listening on port %d', port)
app.emit('ready')