Source refractoring

This commit is contained in:
Chocobozzz 2015-11-16 13:54:00 +01:00
parent 16357143ae
commit a18603803a
7 changed files with 156 additions and 119 deletions

View File

@ -1,20 +1,22 @@
;(function () { ;(function () {
'use strict' 'use strict'
// ----------- Constantes ----------- // ----------- Constants -----------
global.API_VERSION = 'v1' global.API_VERSION = 'v1'
// ----------- Node modules ----------- // ----------- Node modules -----------
var bodyParser = require('body-parser')
var express = require('express') var express = require('express')
var expressValidator = require('express-validator') var expressValidator = require('express-validator')
var path = require('path') var http = require('http')
var morgan = require('morgan') var morgan = require('morgan')
var bodyParser = require('body-parser')
var multer = require('multer') var multer = require('multer')
var path = require('path')
var TrackerServer = require('bittorrent-tracker').Server var TrackerServer = require('bittorrent-tracker').Server
var WebSocketServer = require('ws').Server var WebSocketServer = require('ws').Server
// Create our main app
var app = express() var app = express()
var http = require('http')
// ----------- Checker ----------- // ----------- Checker -----------
var checker = require('./src/checker') var checker = require('./src/checker')
@ -26,7 +28,7 @@
process.exit(0) process.exit(0)
} }
checker.createDirectories() checker.createDirectoriesIfNotExist()
// ----------- PeerTube modules ----------- // ----------- PeerTube modules -----------
var config = require('config') var config = require('config')
@ -36,16 +38,22 @@
var videos = require('./src/videos') var videos = require('./src/videos')
var webtorrent = require('./src/webTorrentNode') var webtorrent = require('./src/webTorrentNode')
// Get configurations
var port = config.get('listen.port') var port = config.get('listen.port')
var uploads = config.get('storage.uploads') var uploads = config.get('storage.uploads')
// ----------- Command line ----------- // ----------- Command line -----------
// ----------- App ----------- // ----------- App -----------
// For the logger
app.use(morgan('combined', { stream: logger.stream })) app.use(morgan('combined', { stream: logger.stream }))
// For body requests
app.use(bodyParser.json()) app.use(bodyParser.json())
// For POST file requests
app.use(multer({ dest: uploads })) app.use(multer({ dest: uploads }))
app.use(bodyParser.urlencoded({ extended: false })) app.use(bodyParser.urlencoded({ extended: false }))
// Validate some params for the API
app.use(expressValidator()) app.use(expressValidator())
// ----------- Views, routes and static files ----------- // ----------- Views, routes and static files -----------
@ -55,15 +63,17 @@
port: 35729 port: 35729
})) }))
// Catch sefaults
require('segfault-handler').registerHandler() require('segfault-handler').registerHandler()
// Static files
app.use(express.static(path.join(__dirname, '/public'), { maxAge: 0 })) app.use(express.static(path.join(__dirname, '/public'), { maxAge: 0 }))
// Jade template from ./views directory // Jade template from ./views directory
app.set('views', path.join(__dirname, '/views')) app.set('views', path.join(__dirname, '/views'))
app.set('view engine', 'jade') app.set('view engine', 'jade')
// API // API routes
var api_route = '/api/' + global.API_VERSION var api_route = '/api/' + global.API_VERSION
app.use(api_route, routes.api) app.use(api_route, routes.api)

View File

@ -25,7 +25,7 @@
} }
// Create directories for the storage if it doesn't exist // Create directories for the storage if it doesn't exist
checker.createDirectories = function () { checker.createDirectoriesIfNotExist = function () {
var storages = config.get('storage') var storages = config.get('storage')
for (var key of Object.keys(storages)) { for (var key of Object.keys(storages)) {

View File

@ -1,10 +1,9 @@
;(function () { ;(function () {
// Thanks http://tostring.it/2014/06/23/advanced-logging-with-nodejs/ // Thanks http://tostring.it/2014/06/23/advanced-logging-with-nodejs/
'use strict' 'use strict'
var winston = require('winston')
var config = require('config') var config = require('config')
var winston = require('winston')
var logDir = __dirname + '/../' + config.get('storage.logs') var logDir = __dirname + '/../' + config.get('storage.logs')

View File

@ -1,16 +1,17 @@
;(function () { ;(function () {
'use strict' 'use strict'
var fs = require('fs')
var config = require('config')
var async = require('async') var async = require('async')
var config = require('config')
var fs = require('fs')
var request = require('request') var request = require('request')
var logger = require('./logger') var logger = require('./logger')
var utils = require('./utils')
var PodsDB = require('./database').PodsDB var PodsDB = require('./database').PodsDB
var utils = require('./utils')
var pods = {} var pods = {}
var http = config.get('webserver.https') ? 'https' : 'http' var http = config.get('webserver.https') ? 'https' : 'http'
var host = config.get('webserver.host') var host = config.get('webserver.host')
var port = config.get('webserver.port') var port = config.get('webserver.port')
@ -27,6 +28,7 @@
} }
// ----------- Public functions ----------- // ----------- Public functions -----------
pods.list = function (callback) { pods.list = function (callback) {
PodsDB.find(function (err, pods_list) { PodsDB.find(function (err, pods_list) {
if (err) { if (err) {
@ -73,18 +75,27 @@
} }
logger.debug('Make multiple requests.') logger.debug('Make multiple requests.')
var params = {
encrypt: true,
sign: true,
method: data.method,
path: data.path,
data: data.data
}
utils.makeMultipleRetryRequest( utils.makeMultipleRetryRequest(
{ encrypt: true, sign: true, method: data.method, path: data.path, data: data.data }, params,
urls, urls,
function (err, response, body, url) { function callbackEachPodFinished (err, response, body, url) {
if (err || response.statusCode !== 200) { if (err || response.statusCode !== 200) {
logger.error('Error sending secure request to %s/%s pod.', url, data.path, { error: err }) logger.error('Error sending secure request to %s/%s pod.', url, data.path, { error: err })
} }
}, },
function (err) { function callbackAllPodsFinished (err) {
if (err) { if (err) {
logger.error('There was some errors when sending the video meta data.', { error: err }) logger.error('There was some errors when sending the video meta data.', { error: err })
return callback(err) return callback(err)
@ -98,7 +109,9 @@
} }
pods.makeFriends = function (callback) { pods.makeFriends = function (callback) {
logger.debug('Read public key...') var pods_score = {}
logger.info('Make friends!')
fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) {
if (err) { if (err) {
logger.error('Cannot read public cert.', { error: err }) logger.error('Cannot read public cert.', { error: err })
@ -106,71 +119,86 @@
} }
var urls = config.get('network.friends') var urls = config.get('network.friends')
var pods_score = {}
async.each(urls, function (url, callback) { async.each(urls, computeForeignPodsList, function () {
// Always add a trust pod logger.debug('Pods scores computed.', { pods_score: pods_score })
pods_score[url] = Infinity var pods_list = computeWinningPods(urls, pods_score)
logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list })
getForeignPodsList(url, function (foreign_pods_list) {
if (foreign_pods_list.length === 0) return callback()
async.each(foreign_pods_list, function (foreign_pod, callback) {
var foreign_url = foreign_pod.url
if (pods_score[foreign_url]) pods_score[foreign_url]++
else pods_score[foreign_url] = 1
callback()
}, callback)
})
}, function () {
logger.debug('Pods score', { pods_score: pods_score })
// Build the list of pods to add
// Only add a pod if it exists in more than a half base pods
var pods_list = []
var base_score = urls.length / 2
Object.keys(pods_score).forEach(function (pod) {
if (pods_score[pod] > base_score) pods_list.push({ url: pod })
})
logger.debug('Pods that we keep', { pods: pods_list })
var data = {
url: http + '://' + host + ':' + port,
publicKey: cert
}
logger.debug('Make requests...') logger.debug('Make requests...')
makeRequestsToWinningPods(cert, pods_list)
utils.makeMultipleRetryRequest(
{ method: 'POST', path: '/api/' + global.API_VERSION + '/pods/', data: data },
pods_list,
function eachRequest (err, response, body, url) {
if (!err && response.statusCode === 200) {
pods.add({ url: url, publicKey: body.cert }, function (err) {
if (err) {
logger.error('Error with adding %s pod.', url, { error: err })
}
})
} else {
logger.error('Error with adding %s pod.', url)
}
},
function endRequests (err) {
if (err) {
logger.error('There was some errors when we wanted to make friends.', { error: err })
return callback(err)
}
logger.debug('Finished')
callback(null)
}
)
}) })
}) })
// -----------------------------------------------------------------------
function computeForeignPodsList (url, callback) {
// Always add a trust pod
pods_score[url] = Infinity
getForeignPodsList(url, function (foreign_pods_list) {
if (foreign_pods_list.length === 0) return callback()
async.each(foreign_pods_list, function (foreign_pod, callback_each) {
var foreign_url = foreign_pod.url
if (pods_score[foreign_url]) pods_score[foreign_url]++
else pods_score[foreign_url] = 1
callback_each()
}, function () {
callback()
})
})
}
function computeWinningPods (urls, pods_score) {
// Build the list of pods to add
// Only add a pod if it exists in more than a half base pods
var pods_list = []
var base_score = urls.length / 2
Object.keys(pods_score).forEach(function (pod) {
if (pods_score[pod] > base_score) pods_list.push({ url: pod })
})
return pods_list
}
function makeRequestsToWinningPods (cert, pods_list) {
var data = {
url: http + '://' + host + ':' + port,
publicKey: cert
}
utils.makeMultipleRetryRequest(
{ method: 'POST', path: '/api/' + global.API_VERSION + '/pods/', data: data },
pods_list,
function eachRequest (err, response, body, url) {
// We add the pod if it responded correctly with its public certificate
if (!err && response.statusCode === 200) {
pods.add({ url: url, publicKey: body.cert }, function (err) {
if (err) {
logger.error('Error with adding %s pod.', url, { error: err })
}
})
} else {
logger.error('Error with adding %s pod.', url, { error: err || new Error('Status not 200') })
}
},
function endRequests (err) {
if (err) {
logger.error('There was some errors when we wanted to make friends.', { error: err })
return callback(err)
}
logger.debug('makeRequestsToWinningPods finished.')
return callback(null)
}
)
}
} }
module.exports = pods module.exports = pods

View File

@ -1,23 +1,23 @@
;(function () { ;(function () {
'use strict' 'use strict'
var config = require('config')
var crypto = require('crypto')
var fs = require('fs')
var openssl = require('openssl-wrapper')
var request = require('request') var request = require('request')
var replay = require('request-replay') var replay = require('request-replay')
var ursa = require('ursa') var ursa = require('ursa')
var config = require('config')
var fs = require('fs')
var openssl = require('openssl-wrapper')
var crypto = require('crypto')
var logger = require('./logger') var logger = require('./logger')
var utils = {}
var http = config.get('webserver.https') ? 'https' : 'http' var http = config.get('webserver.https') ? 'https' : 'http'
var host = config.get('webserver.host') var host = config.get('webserver.host')
var port = config.get('webserver.port') var port = config.get('webserver.port')
var algorithm = 'aes-256-ctr' var algorithm = 'aes-256-ctr'
var utils = {}
// ----------- Private functions ---------- // ----------- Private functions ----------
function makeRetryRequest (params, from_url, to_pod, signature, callbackEach) { function makeRetryRequest (params, from_url, to_pod, signature, callbackEach) {
@ -29,7 +29,7 @@
} }
} }
logger.debug('Sending informations to %s', to_pod.url, { params: params }) logger.debug('Sending informations to %s.', to_pod.url, { params: params })
// Replay 15 times, with factor 3 // Replay 15 times, with factor 3
replay( replay(
@ -52,7 +52,7 @@
utils.certDir = __dirname + '/../' + config.get('storage.certs') utils.certDir = __dirname + '/../' + config.get('storage.certs')
// { path, data } // { path, data }
utils.makeMultipleRetryRequest = function (all, pods, callbackEach, callback) { utils.makeMultipleRetryRequest = function (all_data, pods, callbackEach, callback) {
if (!callback) { if (!callback) {
callback = callbackEach callback = callbackEach
callbackEach = function () {} callbackEach = function () {}
@ -61,8 +61,8 @@
var url = http + '://' + host + ':' + port var url = http + '://' + host + ':' + port
var signature var signature
// Signature ? // Add signature if it is specified in the params
if (all.method === 'POST' && all.data && all.sign === true) { if (all_data.method === 'POST' && all_data.data && all_data.sign === true) {
var myKey = ursa.createPrivateKey(fs.readFileSync(utils.certDir + 'peertube.key.pem')) var myKey = ursa.createPrivateKey(fs.readFileSync(utils.certDir + 'peertube.key.pem'))
signature = myKey.hashAndSign('sha256', url, 'utf8', 'hex') signature = myKey.hashAndSign('sha256', url, 'utf8', 'hex')
} }
@ -70,22 +70,21 @@
// Make a request for each pod // Make a request for each pod
for (var pod of pods) { for (var pod of pods) {
var params = { var params = {
url: pod.url + all.path, url: pod.url + all_data.path,
method: all.method method: all_data.method
} }
// Add data with POST requst ? // Add data with POST requst ?
if (all.method === 'POST' && all.data) { if (all_data.method === 'POST' && all_data.data) {
logger.debug('Make a POST request.') logger.debug('Make a POST request.')
// Encrypt data ? // Encrypt data ?
if (all.encrypt === true) { if (all_data.encrypt === true) {
logger.debug(pod.publicKey)
var crt = ursa.createPublicKey(pod.publicKey) var crt = ursa.createPublicKey(pod.publicKey)
// TODO: ES6 with let // TODO: ES6 with let
;(function (crt_copy, copy_params, copy_url, copy_pod, copy_signature) { ;(function (crt_copy, copy_params, copy_url, copy_pod, copy_signature) {
utils.symetricEncrypt(JSON.stringify(all.data), function (err, dataEncrypted) { utils.symetricEncrypt(JSON.stringify(all_data.data), function (err, dataEncrypted) {
if (err) throw err if (err) throw err
var passwordEncrypted = crt_copy.encrypt(dataEncrypted.password, 'utf8', 'hex') var passwordEncrypted = crt_copy.encrypt(dataEncrypted.password, 'utf8', 'hex')
@ -98,7 +97,7 @@
}) })
})(crt, params, url, pod, signature) })(crt, params, url, pod, signature)
} else { } else {
params.json = { data: all.data } params.json = { data: all_data.data }
makeRetryRequest(params, url, pod, signature, callbackEach) makeRetryRequest(params, url, pod, signature, callbackEach)
} }
} else { } else {
@ -124,20 +123,22 @@
return callback(new Error(string)) return callback(new Error(string))
} }
logger.debug('Gen RSA keys...') logger.info('Generating a RSA key...')
openssl.exec('genrsa', { 'out': utils.certDir + 'peertube.key.pem', '2048': false }, function (err) { openssl.exec('genrsa', { 'out': utils.certDir + 'peertube.key.pem', '2048': false }, function (err) {
if (err) { if (err) {
logger.error('Cannot create private key on this pod.', { error: err }) logger.error('Cannot create private key on this pod.', { error: err })
return callback(err) return callback(err)
} }
logger.info('RSA key generated.')
logger.debug('Manage public key...') logger.info('Manage public key...')
openssl.exec('rsa', { 'in': utils.certDir + 'peertube.key.pem', 'pubout': true, 'out': utils.certDir + 'peertube.pub' }, function (err) { openssl.exec('rsa', { 'in': utils.certDir + 'peertube.key.pem', 'pubout': true, 'out': utils.certDir + 'peertube.pub' }, function (err) {
if (err) { if (err) {
logger.error('Cannot create public key on this pod .', { error: err }) logger.error('Cannot create public key on this pod .', { error: err })
return callback(err) return callback(err)
} }
logger.info('Public key managed.')
return callback(null) return callback(null)
}) })
}) })

View File

@ -1,27 +1,27 @@
;(function () { ;(function () {
'use strict' 'use strict'
var async = require('async')
var config = require('config')
var fs = require('fs') var fs = require('fs')
var webtorrent = require('./webTorrentNode') var webtorrent = require('./webTorrentNode')
var config = require('config')
var async = require('async')
var logger = require('./logger') var logger = require('./logger')
var VideosDB = require('./database').VideosDB
var pods = require('./pods') var pods = require('./pods')
var VideosDB = require('./database').VideosDB
var videos = {} var videos = {}
// Public url
var http = config.get('webserver.https') === true ? 'https' : 'http' var http = config.get('webserver.https') === true ? 'https' : 'http'
var host = config.get('webserver.host') var host = config.get('webserver.host')
var port = config.get('webserver.port') var port = config.get('webserver.port')
// ----------- Private functions ----------- // ----------- Private functions -----------
function seedVideo (path, callback) { function seedVideo (path, callback) {
logger.debug('Seeding : %s', path) logger.info('Seeding %s...', path)
webtorrent.seed(path, function (torrent) { webtorrent.seed(path, function (torrent) {
logger.debug('Seeded : %s', torrent.magnetURI) logger.info('%s seeded (%s).', path, torrent.magnetURI)
return callback(null, torrent) return callback(null, torrent)
}) })
@ -46,7 +46,7 @@
var video_file = data.video var video_file = data.video
var video_data = data.data var video_data = data.data
logger.debug('Path: %s', video_file.path) logger.info('Adding %s video.', video_file.path)
seedVideo(video_file.path, function (err, torrent) { seedVideo(video_file.path, function (err, torrent) {
if (err) { if (err) {
logger.error('Cannot seed this video.', { error: err }) logger.error('Cannot seed this video.', { error: err })
@ -70,7 +70,7 @@
// Now we'll send the video's meta data // Now we'll send the video's meta data
params.namePath = null params.namePath = null
logger.debug('Sending this video Uri to friends...') logger.info('Sending %s video to friends.', video_file.path)
var data = { var data = {
path: '/api/' + global.API_VERSION + '/remotevideos/add', path: '/api/' + global.API_VERSION + '/remotevideos/add',
@ -91,7 +91,7 @@
} }
videos.remove = function (id, callback) { videos.remove = function (id, callback) {
// Maybe the torrent is not seeding, it doesn't have importance // Maybe the torrent is not seeded, but we catch the error to don't stop the removing process
function removeTorrent (magnetUri, callback) { function removeTorrent (magnetUri, callback) {
try { try {
webtorrent.remove(magnetUri, callback) webtorrent.remove(magnetUri, callback)
@ -114,7 +114,7 @@
return callback(new Error(error_string)) return callback(new Error(error_string))
} }
logger.debug('Removing video %s', video.magnetUri) logger.info('Removing %s video', video.name)
removeTorrent(video.magnetUri, function () { removeTorrent(video.magnetUri, function () {
VideosDB.findByIdAndRemove(id, function (err) { VideosDB.findByIdAndRemove(id, function (err) {
@ -154,16 +154,15 @@
// Use the magnet Uri because the _id field is not the same on different servers // Use the magnet Uri because the _id field is not the same on different servers
videos.removeRemote = function (fromUrl, magnetUri, callback) { videos.removeRemote = function (fromUrl, magnetUri, callback) {
// TODO : check if the remote server has the rights to remove this video
VideosDB.findOne({ magnetUri: magnetUri }, function (err, video) { VideosDB.findOne({ magnetUri: magnetUri }, function (err, video) {
if (err || !video) { if (err || !video) {
logger.error('Cannot find the torrent URI of this remote video.') logger.error('Cannot find the torrent URI of this remote video.')
return callback(err) return callback(err)
} }
// TODO: move to reqValidators middleware ?
if (video.podUrl !== fromUrl) { if (video.podUrl !== fromUrl) {
logger.error('The pod has not rights on this video.') logger.error('The pod has not the rights on this video.')
return callback(err) return callback(err)
} }
@ -222,23 +221,23 @@
}) })
} }
videos.seedAll = function (final_callback) { videos.seedAll = function (callback) {
VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) { VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) {
if (err) { if (err) {
logger.error('Cannot get list of the videos to seed.', { error: err }) logger.error('Cannot get list of the videos to seed.', { error: err })
return final_callback(err) return callback(err)
} }
async.each(videos_list, function (video, callback) { async.each(videos_list, function (video, each_callback) {
seedVideo(videos.uploadDir + video.namePath, function (err) { seedVideo(videos.uploadDir + video.namePath, function (err) {
if (err) { if (err) {
logger.error('Cannot seed this video.', { error: err }) logger.error('Cannot seed this video.', { error: err })
return callback(err) return callback(err)
} }
callback(null) each_callback(null)
}) })
}, final_callback) }, callback)
}) })
} }

View File

@ -1,10 +1,10 @@
;(function () { ;(function () {
'use strict' 'use strict'
var spawn = require('electron-spawn')
var config = require('config') var config = require('config')
var ipc = require('node-ipc') var ipc = require('node-ipc')
var pathUtils = require('path') var pathUtils = require('path')
var spawn = require('electron-spawn')
var logger = require('./logger') var logger = require('./logger')
@ -85,12 +85,12 @@
} }
} }
if (!webtorrentnode.silent) logger.debug('Node wants to seed ' + data._id) if (!webtorrentnode.silent) logger.debug('Node wants to seed %s.', data._id)
// Finish signal // Finish signal
var event_key = nodeKey + '.seedDone.' + data._id var event_key = nodeKey + '.seedDone.' + data._id
ipc.server.on(event_key, function listener (received) { ipc.server.on(event_key, function listener (received) {
if (!webtorrentnode.silent) logger.debug('Process seeded torrent ' + received.magnetUri) if (!webtorrentnode.silent) logger.debug('Process seeded torrent %s.', received.magnetUri)
// This is a fake object, we just use the magnetUri in this project // This is a fake object, we just use the magnetUri in this project
var torrent = { var torrent = {
@ -117,7 +117,7 @@
// Finish signal // Finish signal
var event_key = nodeKey + '.addDone.' + data._id var event_key = nodeKey + '.addDone.' + data._id
ipc.server.on(event_key, function (received) { ipc.server.on(event_key, function (received) {
if (!webtorrentnode.silent) logger.debug('Process added torrent') if (!webtorrentnode.silent) logger.debug('Process added torrent.')
// This is a fake object, we just use the magnetUri in this project // This is a fake object, we just use the magnetUri in this project
var torrent = { var torrent = {
@ -139,12 +139,12 @@
} }
} }
if (!webtorrentnode.silent) logger.debug('Node wants to stop seeding ' + data._id) if (!webtorrentnode.silent) logger.debug('Node wants to stop seeding %s.', data._id)
// Finish signal // Finish signal
var event_key = nodeKey + '.removeDone.' + data._id var event_key = nodeKey + '.removeDone.' + data._id
ipc.server.on(event_key, function (received) { ipc.server.on(event_key, function (received) {
if (!webtorrentnode.silent) logger.debug('Process removed torrent ' + data._id) if (!webtorrentnode.silent) logger.debug('Process removed torrent %s.', data._id)
var err = null var err = null
if (received.err) err = received.err if (received.err) err = received.err