Request model refractoring -> use mongoose api
This commit is contained in:
parent
aaf61f3810
commit
00057e85a7
18
server.js
18
server.js
|
@ -21,24 +21,26 @@ if (miss.length !== 0) {
|
||||||
throw new Error('Miss some configurations keys : ' + miss)
|
throw new Error('Miss some configurations keys : ' + miss)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------- PeerTube modules -----------
|
// ----------- Database -----------
|
||||||
const config = require('config')
|
const config = require('config')
|
||||||
const constants = require('./server/initializers/constants')
|
const constants = require('./server/initializers/constants')
|
||||||
const customValidators = require('./server/helpers/customValidators')
|
|
||||||
const database = require('./server/initializers/database')
|
const database = require('./server/initializers/database')
|
||||||
const installer = require('./server/initializers/installer')
|
|
||||||
const logger = require('./server/helpers/logger')
|
const logger = require('./server/helpers/logger')
|
||||||
const poolRequests = require('./server/lib/requestsScheduler')
|
|
||||||
|
database.connect()
|
||||||
|
|
||||||
|
// ----------- PeerTube modules -----------
|
||||||
|
const customValidators = require('./server/helpers/customValidators')
|
||||||
|
const installer = require('./server/initializers/installer')
|
||||||
|
const mongoose = require('mongoose')
|
||||||
const routes = require('./server/controllers')
|
const routes = require('./server/controllers')
|
||||||
const utils = require('./server/helpers/utils')
|
const utils = require('./server/helpers/utils')
|
||||||
const webtorrent = require('./server/lib/webtorrent')
|
const webtorrent = require('./server/lib/webtorrent')
|
||||||
|
const Request = mongoose.model('Request')
|
||||||
|
|
||||||
// Get configurations
|
// Get configurations
|
||||||
const port = config.get('listen.port')
|
const port = config.get('listen.port')
|
||||||
|
|
||||||
// ----------- Database -----------
|
|
||||||
database.connect()
|
|
||||||
|
|
||||||
// ----------- Command line -----------
|
// ----------- Command line -----------
|
||||||
|
|
||||||
// ----------- App -----------
|
// ----------- App -----------
|
||||||
|
@ -135,7 +137,7 @@ installer.installApplication(function (err) {
|
||||||
// ----------- Make the server listening -----------
|
// ----------- Make the server listening -----------
|
||||||
server.listen(port, function () {
|
server.listen(port, function () {
|
||||||
// Activate the pool requests
|
// Activate the pool requests
|
||||||
poolRequests.activate()
|
Request.activate()
|
||||||
|
|
||||||
// videos.seedAllExisting(function () {
|
// videos.seedAllExisting(function () {
|
||||||
logger.info('Seeded all the videos')
|
logger.info('Seeded all the videos')
|
||||||
|
|
|
@ -7,6 +7,8 @@ const logger = require('../helpers/logger')
|
||||||
|
|
||||||
// Bootstrap models
|
// Bootstrap models
|
||||||
require('../models/video')
|
require('../models/video')
|
||||||
|
// Request model needs Video model
|
||||||
|
require('../models/request')
|
||||||
|
|
||||||
const dbname = 'peertube' + config.get('database.suffix')
|
const dbname = 'peertube' + config.get('database.suffix')
|
||||||
const host = config.get('database.host')
|
const host = config.get('database.host')
|
||||||
|
|
|
@ -10,12 +10,12 @@ const constants = require('../initializers/constants')
|
||||||
const logger = require('../helpers/logger')
|
const logger = require('../helpers/logger')
|
||||||
const peertubeCrypto = require('../helpers/peertubeCrypto')
|
const peertubeCrypto = require('../helpers/peertubeCrypto')
|
||||||
const Pods = require('../models/pods')
|
const Pods = require('../models/pods')
|
||||||
const requestsScheduler = require('../lib/requestsScheduler')
|
|
||||||
const requests = require('../helpers/requests')
|
const requests = require('../helpers/requests')
|
||||||
|
|
||||||
const http = config.get('webserver.https') ? 'https' : 'http'
|
const http = config.get('webserver.https') ? 'https' : 'http'
|
||||||
const host = config.get('webserver.host')
|
const host = config.get('webserver.host')
|
||||||
const port = config.get('webserver.port')
|
const port = config.get('webserver.port')
|
||||||
|
const Request = mongoose.model('Request')
|
||||||
const Video = mongoose.model('Video')
|
const Video = mongoose.model('Video')
|
||||||
|
|
||||||
const pods = {
|
const pods = {
|
||||||
|
@ -29,10 +29,7 @@ const pods = {
|
||||||
}
|
}
|
||||||
|
|
||||||
function addVideoToFriends (video) {
|
function addVideoToFriends (video) {
|
||||||
// ensure namePath is null
|
createRequest('add', video)
|
||||||
video.namePath = null
|
|
||||||
|
|
||||||
requestsScheduler.addRequest('add', video)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function hasFriends (callback) {
|
function hasFriends (callback) {
|
||||||
|
@ -76,9 +73,9 @@ function makeFriends (callback) {
|
||||||
|
|
||||||
function quitFriends (callback) {
|
function quitFriends (callback) {
|
||||||
// Stop pool requests
|
// Stop pool requests
|
||||||
requestsScheduler.deactivate()
|
Request.deactivate()
|
||||||
// Flush pool requests
|
// Flush pool requests
|
||||||
requestsScheduler.flush()
|
Request.flush()
|
||||||
|
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
function getPodsList (callbackAsync) {
|
function getPodsList (callbackAsync) {
|
||||||
|
@ -127,7 +124,7 @@ function quitFriends (callback) {
|
||||||
}
|
}
|
||||||
], function (err) {
|
], function (err) {
|
||||||
// Don't forget to re activate the scheduler, even if there was an error
|
// Don't forget to re activate the scheduler, even if there was an error
|
||||||
requestsScheduler.activate()
|
Request.activate()
|
||||||
|
|
||||||
if (err) return callback(err)
|
if (err) return callback(err)
|
||||||
|
|
||||||
|
@ -136,8 +133,8 @@ function quitFriends (callback) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function removeVideoToFriends (video) {
|
function removeVideoToFriends (videoParams) {
|
||||||
requestsScheduler.addRequest('remove', video)
|
createRequest('remove', videoParams)
|
||||||
}
|
}
|
||||||
|
|
||||||
function sendOwnedVideosToPod (podId) {
|
function sendOwnedVideosToPod (podId) {
|
||||||
|
@ -155,7 +152,7 @@ function sendOwnedVideosToPod (podId) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo)
|
createRequest('add', remoteVideo, [ podId ])
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -211,9 +208,9 @@ function getForeignPodsList (url, callback) {
|
||||||
|
|
||||||
function makeRequestsToWinningPods (cert, podsList, callback) {
|
function makeRequestsToWinningPods (cert, podsList, callback) {
|
||||||
// Stop pool requests
|
// Stop pool requests
|
||||||
requestsScheduler.deactivate()
|
Request.deactivate()
|
||||||
// Flush pool requests
|
// Flush pool requests
|
||||||
requestsScheduler.forceSend()
|
Request.forceSend()
|
||||||
|
|
||||||
async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
|
async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
|
||||||
const params = {
|
const params = {
|
||||||
|
@ -249,9 +246,26 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
|
||||||
}, function endRequests () {
|
}, function endRequests () {
|
||||||
// Final callback, we've ended all the requests
|
// Final callback, we've ended all the requests
|
||||||
// Now we made new friends, we can re activate the pool of requests
|
// Now we made new friends, we can re activate the pool of requests
|
||||||
requestsScheduler.activate()
|
Request.activate()
|
||||||
|
|
||||||
logger.debug('makeRequestsToWinningPods finished.')
|
logger.debug('makeRequestsToWinningPods finished.')
|
||||||
return callback()
|
return callback()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function createRequest (type, data, to) {
|
||||||
|
const req = new Request({
|
||||||
|
request: {
|
||||||
|
type: type,
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if (to) {
|
||||||
|
req.to = to
|
||||||
|
}
|
||||||
|
|
||||||
|
req.save(function (err) {
|
||||||
|
if (err) logger.error('Cannot save the request.', { error: err })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const mongoose = require('mongoose')
|
const mongoose = require('mongoose')
|
||||||
|
const map = require('lodash/map')
|
||||||
|
|
||||||
const constants = require('../initializers/constants')
|
const constants = require('../initializers/constants')
|
||||||
const logger = require('../helpers/logger')
|
const logger = require('../helpers/logger')
|
||||||
|
@ -76,7 +77,11 @@ function list (callback) {
|
||||||
}
|
}
|
||||||
|
|
||||||
function listAllIds (callback) {
|
function listAllIds (callback) {
|
||||||
return PodsDB.find({}, { _id: 1 }, callback)
|
return PodsDB.find({}, { _id: 1 }, function (err, pods) {
|
||||||
|
if (err) return callback(err)
|
||||||
|
|
||||||
|
return callback(null, map(pods, '_id'))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function listAllUrls (callback) {
|
function listAllUrls (callback) {
|
||||||
|
|
|
@ -7,60 +7,51 @@ const mongoose = require('mongoose')
|
||||||
const constants = require('../initializers/constants')
|
const constants = require('../initializers/constants')
|
||||||
const logger = require('../helpers/logger')
|
const logger = require('../helpers/logger')
|
||||||
const Pods = require('../models/pods')
|
const Pods = require('../models/pods')
|
||||||
const Requests = require('../models/requests')
|
|
||||||
const requests = require('../helpers/requests')
|
const requests = require('../helpers/requests')
|
||||||
|
|
||||||
const Video = mongoose.model('Video')
|
const Video = mongoose.model('Video')
|
||||||
|
|
||||||
let timer = null
|
let timer = null
|
||||||
|
|
||||||
const requestsScheduler = {
|
// ---------------------------------------------------------------------------
|
||||||
activate: activate,
|
|
||||||
addRequest: addRequest,
|
const RequestSchema = mongoose.Schema({
|
||||||
addRequestTo: addRequestTo,
|
request: mongoose.Schema.Types.Mixed,
|
||||||
deactivate: deactivate,
|
to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ]
|
||||||
flush: flush,
|
})
|
||||||
forceSend: forceSend
|
|
||||||
|
RequestSchema.statics = {
|
||||||
|
activate,
|
||||||
|
deactivate,
|
||||||
|
flush,
|
||||||
|
forceSend
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RequestSchema.pre('save', function (next) {
|
||||||
|
const self = this
|
||||||
|
|
||||||
|
if (self.to.length === 0) {
|
||||||
|
Pods.listAllIds(function (err, podIds) {
|
||||||
|
if (err) return next(err)
|
||||||
|
|
||||||
|
// No friends
|
||||||
|
if (podIds.length === 0) return
|
||||||
|
|
||||||
|
self.to = podIds
|
||||||
|
return next()
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
return next()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
mongoose.model('Request', RequestSchema)
|
||||||
|
|
||||||
|
// ------------------------------ STATICS ------------------------------
|
||||||
|
|
||||||
function activate () {
|
function activate () {
|
||||||
logger.info('Requests scheduler activated.')
|
logger.info('Requests scheduler activated.')
|
||||||
timer = setInterval(makeRequests, constants.INTERVAL)
|
timer = setInterval(makeRequests.bind(this), constants.INTERVAL)
|
||||||
}
|
|
||||||
|
|
||||||
// Add request to the scheduler
|
|
||||||
function addRequest (type, data) {
|
|
||||||
logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
|
|
||||||
|
|
||||||
const request = {
|
|
||||||
type: type,
|
|
||||||
data: data
|
|
||||||
}
|
|
||||||
|
|
||||||
Pods.listAllIds(function (err, podIds) {
|
|
||||||
if (err) {
|
|
||||||
logger.debug('Cannot list pod ids.')
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// No friends
|
|
||||||
if (!podIds) return
|
|
||||||
|
|
||||||
Requests.create(request, podIds, function (err) {
|
|
||||||
if (err) logger.error('Cannot create a request.', { error: err })
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
function addRequestTo (podIds, type, data) {
|
|
||||||
const request = {
|
|
||||||
type: type,
|
|
||||||
data: data
|
|
||||||
}
|
|
||||||
|
|
||||||
Requests.create(request, podIds, function (err) {
|
|
||||||
if (err) logger.error('Cannot create a request.', { error: err })
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function deactivate () {
|
function deactivate () {
|
||||||
|
@ -69,24 +60,18 @@ function deactivate () {
|
||||||
}
|
}
|
||||||
|
|
||||||
function flush () {
|
function flush () {
|
||||||
Requests.removeAll(function (err) {
|
removeAll.call(this, function (err) {
|
||||||
if (err) {
|
if (err) logger.error('Cannot flush the requests.', { error: err })
|
||||||
logger.error('Cannot flush the requests.', { error: err })
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function forceSend () {
|
function forceSend () {
|
||||||
logger.info('Force requests scheduler sending.')
|
logger.info('Force requests scheduler sending.')
|
||||||
makeRequests()
|
makeRequests.call(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
module.exports = requestsScheduler
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
// Make a requests to friends of a certain type
|
// Make a requests to friends of a certain type
|
||||||
function makeRequest (toPod, requestsToMake, callback) {
|
function makeRequest (toPod, requestsToMake, callback) {
|
||||||
if (!callback) callback = function () {}
|
if (!callback) callback = function () {}
|
||||||
|
@ -115,7 +100,9 @@ function makeRequest (toPod, requestsToMake, callback) {
|
||||||
|
|
||||||
// Make all the requests of the scheduler
|
// Make all the requests of the scheduler
|
||||||
function makeRequests () {
|
function makeRequests () {
|
||||||
Requests.list(function (err, requests) {
|
const self = this
|
||||||
|
|
||||||
|
list.call(self, function (err, requests) {
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.error('Cannot get the list of requests.', { err: err })
|
logger.error('Cannot get the list of requests.', { err: err })
|
||||||
return // Abort
|
return // Abort
|
||||||
|
@ -154,11 +141,14 @@ function makeRequests () {
|
||||||
|
|
||||||
// FIXME: mongodb request inside a loop :/
|
// FIXME: mongodb request inside a loop :/
|
||||||
Pods.findById(toPodId, function (err, toPod) {
|
Pods.findById(toPodId, function (err, toPod) {
|
||||||
if (err) return logger.error('Error finding pod by id.', { err: err })
|
if (err) {
|
||||||
|
logger.error('Error finding pod by id.', { err: err })
|
||||||
|
return callbackEach()
|
||||||
|
}
|
||||||
|
|
||||||
// Maybe the pod is not our friend anymore so simply remove them
|
// Maybe the pod is not our friend anymore so simply remove them
|
||||||
if (!toPod) {
|
if (!toPod) {
|
||||||
Requests.removePodOf(requestToMake.ids, toPodId)
|
removePodOf.call(self, requestToMake.ids, toPodId)
|
||||||
return callbackEach()
|
return callbackEach()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,7 +163,7 @@ function makeRequests () {
|
||||||
logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
|
logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
|
||||||
|
|
||||||
// Remove the pod id of these request ids
|
// Remove the pod id of these request ids
|
||||||
Requests.removePodOf(requestToMake.ids, toPodId)
|
removePodOf.call(self, requestToMake.ids, toPodId)
|
||||||
goodPods.push(toPodId)
|
goodPods.push(toPodId)
|
||||||
} else {
|
} else {
|
||||||
badPods.push(toPodId)
|
badPods.push(toPodId)
|
||||||
|
@ -186,7 +176,7 @@ function makeRequests () {
|
||||||
// All the requests were made, we update the pods score
|
// All the requests were made, we update the pods score
|
||||||
updatePodsScore(goodPods, badPods)
|
updatePodsScore(goodPods, badPods)
|
||||||
// Flush requests with no pod
|
// Flush requests with no pod
|
||||||
Requests.removeWithEmptyTo()
|
removeWithEmptyTo.call(self)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -268,3 +258,23 @@ function updatePodsScore (goodPods, badPods) {
|
||||||
removeBadPods()
|
removeBadPods()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function list (callback) {
|
||||||
|
this.find({ }, { _id: 1, request: 1, to: 1 }, callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeAll (callback) {
|
||||||
|
this.remove({ }, callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
function removePodOf (requestsIds, podId, callback) {
|
||||||
|
if (!callback) callback = function () {}
|
||||||
|
|
||||||
|
this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeWithEmptyTo (callback) {
|
||||||
|
if (!callback) callback = function () {}
|
||||||
|
|
||||||
|
this.remove({ to: { $size: 0 } }, callback)
|
||||||
|
}
|
|
@ -1,73 +0,0 @@
|
||||||
'use strict'
|
|
||||||
|
|
||||||
const mongoose = require('mongoose')
|
|
||||||
|
|
||||||
const logger = require('../helpers/logger')
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
const requestsSchema = mongoose.Schema({
|
|
||||||
request: mongoose.Schema.Types.Mixed,
|
|
||||||
to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ]
|
|
||||||
})
|
|
||||||
const RequestsDB = mongoose.model('requests', requestsSchema)
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
const Requests = {
|
|
||||||
create: create,
|
|
||||||
findById: findById,
|
|
||||||
list: list,
|
|
||||||
removeAll: removeAll,
|
|
||||||
removePodOf: removePodOf,
|
|
||||||
removeRequestById: removeRequestById,
|
|
||||||
removeRequests: removeRequests,
|
|
||||||
removeWithEmptyTo: removeWithEmptyTo
|
|
||||||
}
|
|
||||||
|
|
||||||
function create (request, to, callback) {
|
|
||||||
RequestsDB.create({ request: request, to: to }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
function findById (id, callback) {
|
|
||||||
RequestsDB.findOne({ id: id }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
function list (callback) {
|
|
||||||
RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
function removeAll (callback) {
|
|
||||||
RequestsDB.remove({ }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
function removePodOf (requestsIds, podId, callback) {
|
|
||||||
if (!callback) callback = function () {}
|
|
||||||
|
|
||||||
RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
function removeRequestById (id, callback) {
|
|
||||||
RequestsDB.remove({ id: id }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
function removeRequests (ids) {
|
|
||||||
RequestsDB.remove({ _id: { $in: ids } }, function (err) {
|
|
||||||
if (err) {
|
|
||||||
logger.error('Cannot remove requests from the requests database.', { error: err })
|
|
||||||
return // Abort
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('Pool requests flushed.')
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
function removeWithEmptyTo (callback) {
|
|
||||||
if (!callback) callback = function () {}
|
|
||||||
|
|
||||||
RequestsDB.remove({ to: { $size: 0 } }, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
module.exports = Requests
|
|
|
@ -414,7 +414,7 @@ describe('Test multiple pods', function () {
|
||||||
|
|
||||||
// Keep the logs if the test failed
|
// Keep the logs if the test failed
|
||||||
if (this.ok) {
|
if (this.ok) {
|
||||||
// utils.flushTests(done)
|
utils.flushTests(done)
|
||||||
} else {
|
} else {
|
||||||
done()
|
done()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue