Use async/await in lib and initializers

This commit is contained in:
Chocobozzz 2017-10-25 16:03:33 +02:00
parent eb08047657
commit f5028693a8
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
21 changed files with 721 additions and 751 deletions

View File

@ -1,5 +1,4 @@
import * as crypto from 'crypto'
import * as Promise from 'bluebird'
import { join } from 'path'
import {
@ -41,7 +40,7 @@ function checkSignature (publicKey: string, data: string, hexSignature: string)
return isValid
}
function sign (data: string|Object) {
async function sign (data: string|Object) {
const sign = crypto.createSign(SIGNATURE_ALGORITHM)
let dataString: string
@ -52,33 +51,33 @@ function sign (data: string|Object) {
dataString = JSON.stringify(data)
} catch (err) {
logger.error('Cannot sign data.', err)
return Promise.resolve('')
return ''
}
}
sign.update(dataString, 'utf8')
return getMyPrivateCert().then(myKey => {
return sign.sign(myKey, SIGNATURE_ENCODING)
})
const myKey = await getMyPrivateCert()
return await sign.sign(myKey, SIGNATURE_ENCODING)
}
function comparePassword (plainPassword: string, hashPassword: string) {
return bcryptComparePromise(plainPassword, hashPassword)
}
function createCertsIfNotExist () {
return certsExist().then(exist => {
if (exist === true) {
return undefined
}
async function createCertsIfNotExist () {
const exist = await certsExist()
if (exist === true) {
return undefined
}
return createCerts()
})
return await createCerts()
}
function cryptPassword (password: string) {
return bcryptGenSaltPromise(BCRYPT_SALT_SIZE).then(salt => bcryptHashPromise(password, salt))
async function cryptPassword (password: string) {
const salt = await bcryptGenSaltPromise(BCRYPT_SALT_SIZE)
return await bcryptHashPromise(password, salt)
}
function getMyPrivateCert () {
@ -105,51 +104,45 @@ export {
// ---------------------------------------------------------------------------
function certsExist () {
async function certsExist () {
const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
// If there is an error the certificates do not exist
return accessPromise(certPath)
.then(() => true)
.catch(() => false)
try {
await accessPromise(certPath)
return true
} catch {
return false
}
}
function createCerts () {
return certsExist().then(exist => {
if (exist === true) {
const errorMessage = 'Certs already exist.'
logger.warning(errorMessage)
throw new Error(errorMessage)
}
async function createCerts () {
const exist = await certsExist()
if (exist === true) {
const errorMessage = 'Certs already exist.'
logger.warning(errorMessage)
throw new Error(errorMessage)
}
logger.info('Generating a RSA key...')
logger.info('Generating a RSA key...')
const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
const genRsaOptions = {
'out': privateCertPath,
'2048': false
}
return opensslExecPromise('genrsa', genRsaOptions)
.then(() => {
logger.info('RSA key generated.')
logger.info('Managing public key...')
const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
const genRsaOptions = {
'out': privateCertPath,
'2048': false
}
const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub')
const rsaOptions = {
'in': privateCertPath,
'pubout': true,
'out': publicCertPath
}
return opensslExecPromise('rsa', rsaOptions)
.then(() => logger.info('Public key managed.'))
.catch(err => {
logger.error('Cannot create public key on this pod.')
throw err
})
})
.catch(err => {
logger.error('Cannot create private key on this pod.')
throw err
})
})
await opensslExecPromise('genrsa', genRsaOptions)
logger.info('RSA key generated.')
logger.info('Managing public key...')
const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub')
const rsaOptions = {
'in': privateCertPath,
'pubout': true,
'out': publicCertPath
}
await opensslExecPromise('rsa', rsaOptions)
}

View File

@ -73,7 +73,7 @@ function makeSecureRequest (params: MakeSecureRequestParams) {
signature
}
// If there are data informations
// If there are data information
if (params.data) {
requestParams.json.data = params.data
}

View File

@ -8,11 +8,13 @@ import { ResultList } from '../../shared'
import { VideoResolution } from '../../shared/models/videos/video-resolution.enum'
function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) {
res.type('json').status(400).end()
return res.type('json').status(400).end()
}
function generateRandomString (size: number) {
return pseudoRandomBytesPromise(size).then(raw => raw.toString('hex'))
async function generateRandomString (size: number) {
const raw = await pseudoRandomBytesPromise(size)
return raw.toString('hex')
}
interface FormattableToJSON {
@ -34,19 +36,19 @@ function getFormattedObjects<U, T extends FormattableToJSON> (objects: T[], obje
return res
}
function isSignupAllowed () {
async function isSignupAllowed () {
if (CONFIG.SIGNUP.ENABLED === false) {
return Promise.resolve(false)
return false
}
// No limit and signup is enabled
if (CONFIG.SIGNUP.LIMIT === -1) {
return Promise.resolve(true)
return true
}
return db.User.countTotal().then(totalUsers => {
return totalUsers < CONFIG.SIGNUP.LIMIT
})
const totalUsers = await db.User.countTotal()
return totalUsers < CONFIG.SIGNUP.LIMIT
}
function computeResolutionsToTranscode (videoFileHeight: number) {

View File

@ -37,39 +37,37 @@ function checkMissedConfig () {
// Check the available codecs
// We get CONFIG by param to not import it in this file (import orders)
function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) {
async function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) {
const Ffmpeg = require('fluent-ffmpeg')
const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs)
getAvailableCodecsPromise()
.then(codecs => {
if (CONFIG.TRANSCODING.ENABLED === false) return undefined
const codecs = await getAvailableCodecsPromise()
if (CONFIG.TRANSCODING.ENABLED === false) return undefined
const canEncode = [ 'libx264' ]
canEncode.forEach(codec => {
if (codecs[codec] === undefined) {
throw new Error('Unknown codec ' + codec + ' in FFmpeg.')
}
const canEncode = [ 'libx264' ]
for (const codec of canEncode) {
if (codecs[codec] === undefined) {
throw new Error('Unknown codec ' + codec + ' in FFmpeg.')
}
if (codecs[codec].canEncode !== true) {
throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg')
}
})
})
if (codecs[codec].canEncode !== true) {
throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg')
}
}
}
// We get db by param to not import it in this file (import orders)
function clientsExist (OAuthClient: OAuthClientModel) {
return OAuthClient.countTotal().then(totalClients => {
return totalClients !== 0
})
async function clientsExist (OAuthClient: OAuthClientModel) {
const totalClients = await OAuthClient.countTotal()
return totalClients !== 0
}
// We get db by param to not import it in this file (import orders)
function usersExist (User: UserModel) {
return User.countTotal().then(totalUsers => {
return totalUsers !== 0
})
async function usersExist (User: UserModel) {
const totalUsers = await User.countTotal()
return totalUsers !== 0
}
// ---------------------------------------------------------------------------

View File

@ -2,7 +2,7 @@ import { join } from 'path'
import { flattenDepth } from 'lodash'
require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string
import * as Sequelize from 'sequelize'
import * as Promise from 'bluebird'
import * as Bluebird from 'bluebird'
import { CONFIG } from './constants'
// Do not use barrel, we need to load database first
@ -77,26 +77,26 @@ const sequelize = new Sequelize(dbname, username, password, {
database.sequelize = sequelize
database.init = (silent: boolean) => {
database.init = async (silent: boolean) => {
const modelDirectory = join(__dirname, '..', 'models')
return getModelFiles(modelDirectory).then(filePaths => {
filePaths.forEach(filePath => {
const model = sequelize.import(filePath)
const filePaths = await getModelFiles(modelDirectory)
database[model['name']] = model
})
for (const filePath of filePaths) {
const model = sequelize.import(filePath)
Object.keys(database).forEach(modelName => {
if ('associate' in database[modelName]) {
database[modelName].associate(database)
}
})
database[model['name']] = model
}
if (!silent) logger.info('Database %s is ready.', dbname)
for (const modelName of Object.keys(database)) {
if ('associate' in database[modelName]) {
database[modelName].associate(database)
}
}
return undefined
})
if (!silent) logger.info('Database %s is ready.', dbname)
return undefined
}
// ---------------------------------------------------------------------------
@ -107,31 +107,29 @@ export {
// ---------------------------------------------------------------------------
function getModelFiles (modelDirectory: string) {
return readdirPromise(modelDirectory)
.then(files => {
const directories: string[] = files.filter(directory => {
// Find directories
if (
directory.endsWith('.js.map') ||
directory === 'index.js' || directory === 'index.ts' ||
directory === 'utils.js' || directory === 'utils.ts'
) return false
async function getModelFiles (modelDirectory: string) {
const files = await readdirPromise(modelDirectory)
const directories = files.filter(directory => {
// Find directories
if (
directory.endsWith('.js.map') ||
directory === 'index.js' || directory === 'index.ts' ||
directory === 'utils.js' || directory === 'utils.ts'
) return false
return true
})
return true
})
return directories
})
.then(directories => {
const tasks = []
const tasks: Bluebird<any>[] = []
// For each directory we read it and append model in the modelFilePaths array
directories.forEach(directory => {
const modelDirectoryPath = join(modelDirectory, directory)
// For each directory we read it and append model in the modelFilePaths array
for (const directory of directories) {
const modelDirectoryPath = join(modelDirectory, directory)
const promise = readdirPromise(modelDirectoryPath).then(files => {
const filteredFiles = files.filter(file => {
const promise = readdirPromise(modelDirectoryPath)
.then(files => {
const filteredFiles = files
.filter(file => {
if (
file === 'index.js' || file === 'index.ts' ||
file === 'utils.js' || file === 'utils.ts' ||
@ -140,17 +138,15 @@ function getModelFiles (modelDirectory: string) {
) return false
return true
}).map(file => join(modelDirectoryPath, file))
})
.map(file => join(modelDirectoryPath, file))
return filteredFiles
})
tasks.push(promise)
return filteredFiles
})
return Promise.all(tasks)
})
.then((filteredFiles: string[][]) => {
return flattenDepth<string>(filteredFiles, 1)
})
tasks.push(promise)
}
const filteredFilesArray: string[][] = await Promise.all(tasks)
return flattenDepth<string>(filteredFilesArray, 1)
}

View File

@ -1,4 +1,4 @@
// Constants first, databse in second!
// Constants first, database in second!
export * from './constants'
export * from './database'
export * from './checker'

View File

@ -1,5 +1,5 @@
import * as passwordGenerator from 'password-generator'
import * as Promise from 'bluebird'
import * as Bluebird from 'bluebird'
import { database as db } from './database'
import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants'
@ -7,13 +7,13 @@ import { clientsExist, usersExist } from './checker'
import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers'
import { createUserAuthorAndChannel } from '../lib'
function installApplication () {
return db.sequelize.sync()
.then(() => removeCacheDirectories())
.then(() => createDirectoriesIfNotExist())
.then(() => createCertsIfNotExist())
.then(() => createOAuthClientIfNotExist())
.then(() => createOAuthAdminIfNotExist())
async function installApplication () {
await db.sequelize.sync()
await removeCacheDirectories()
await createDirectoriesIfNotExist()
await createCertsIfNotExist()
await createOAuthClientIfNotExist()
await createOAuthAdminIfNotExist()
}
// ---------------------------------------------------------------------------
@ -27,13 +27,13 @@ export {
function removeCacheDirectories () {
const cacheDirectories = CACHE.DIRECTORIES
const tasks = []
const tasks: Bluebird<any>[] = []
// Cache directories
Object.keys(cacheDirectories).forEach(key => {
for (const key of Object.keys(cacheDirectories)) {
const dir = cacheDirectories[key]
tasks.push(rimrafPromise(dir))
})
}
return Promise.all(tasks)
}
@ -43,88 +43,83 @@ function createDirectoriesIfNotExist () {
const cacheDirectories = CACHE.DIRECTORIES
const tasks = []
Object.keys(storage).forEach(key => {
for (const key of Object.keys(storage)) {
const dir = storage[key]
tasks.push(mkdirpPromise(dir))
})
}
// Cache directories
Object.keys(cacheDirectories).forEach(key => {
for (const key of Object.keys(cacheDirectories)) {
const dir = cacheDirectories[key]
tasks.push(mkdirpPromise(dir))
})
}
return Promise.all(tasks)
}
function createOAuthClientIfNotExist () {
return clientsExist(db.OAuthClient).then(exist => {
// Nothing to do, clients already exist
if (exist === true) return undefined
async function createOAuthClientIfNotExist () {
const exist = await clientsExist(db.OAuthClient)
// Nothing to do, clients already exist
if (exist === true) return undefined
logger.info('Creating a default OAuth Client.')
logger.info('Creating a default OAuth Client.')
const id = passwordGenerator(32, false, /[a-z0-9]/)
const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/)
const client = db.OAuthClient.build({
clientId: id,
clientSecret: secret,
grants: [ 'password', 'refresh_token' ],
redirectUris: null
})
return client.save().then(createdClient => {
logger.info('Client id: ' + createdClient.clientId)
logger.info('Client secret: ' + createdClient.clientSecret)
return undefined
})
const id = passwordGenerator(32, false, /[a-z0-9]/)
const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/)
const client = db.OAuthClient.build({
clientId: id,
clientSecret: secret,
grants: [ 'password', 'refresh_token' ],
redirectUris: null
})
const createdClient = await client.save()
logger.info('Client id: ' + createdClient.clientId)
logger.info('Client secret: ' + createdClient.clientSecret)
return undefined
}
function createOAuthAdminIfNotExist () {
return usersExist(db.User).then(exist => {
// Nothing to do, users already exist
if (exist === true) return undefined
async function createOAuthAdminIfNotExist () {
const exist = await usersExist(db.User)
// Nothing to do, users already exist
if (exist === true) return undefined
logger.info('Creating the administrator.')
logger.info('Creating the administrator.')
const username = 'root'
const role = USER_ROLES.ADMIN
const email = CONFIG.ADMIN.EMAIL
let validatePassword = true
let password = ''
const username = 'root'
const role = USER_ROLES.ADMIN
const email = CONFIG.ADMIN.EMAIL
let validatePassword = true
let password = ''
// Do not generate a random password for tests
if (process.env.NODE_ENV === 'test') {
password = 'test'
// Do not generate a random password for tests
if (process.env.NODE_ENV === 'test') {
password = 'test'
if (process.env.NODE_APP_INSTANCE) {
password += process.env.NODE_APP_INSTANCE
}
// Our password is weak so do not validate it
validatePassword = false
} else {
password = passwordGenerator(8, true)
if (process.env.NODE_APP_INSTANCE) {
password += process.env.NODE_APP_INSTANCE
}
const userData = {
username,
email,
password,
role,
videoQuota: -1
}
const user = db.User.build(userData)
// Our password is weak so do not validate it
validatePassword = false
} else {
password = passwordGenerator(8, true)
}
return createUserAuthorAndChannel(user, validatePassword)
.then(({ user }) => {
logger.info('Username: ' + username)
logger.info('User password: ' + password)
const userData = {
username,
email,
password,
role,
videoQuota: -1
}
const user = db.User.build(userData)
logger.info('Creating Application table.')
return db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION })
})
})
await createUserAuthorAndChannel(user, validatePassword)
logger.info('Username: ' + username)
logger.info('User password: ' + password)
logger.info('Creating Application table.')
await db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION })
}

View File

@ -1,52 +1,35 @@
import * as path from 'path'
import * as Promise from 'bluebird'
import { database as db } from './database'
import { LAST_MIGRATION_VERSION } from './constants'
import { logger, readdirPromise } from '../helpers'
function migrate () {
const p = db.sequelize.getQueryInterface().showAllTables()
.then(tables => {
// No tables, we don't need to migrate anything
// The installer will do that
if (tables.length === 0) throw null
})
.then(() => {
return db.Application.loadMigrationVersion()
})
.then(actualVersion => {
if (actualVersion === null) {
return db.Application.create({ migrationVersion: 0 }).then(() => 0)
}
async function migrate () {
const tables = await db.sequelize.getQueryInterface().showAllTables()
return actualVersion
})
.then(actualVersion => {
// No need migrations, abort
if (actualVersion >= LAST_MIGRATION_VERSION) throw null
// No tables, we don't need to migrate anything
// The installer will do that
if (tables.length === 0) return
return actualVersion
})
.then(actualVersion => {
// If there are a new migration scripts
logger.info('Begin migrations.')
let actualVersion = await db.Application.loadMigrationVersion()
if (actualVersion === null) {
await db.Application.create({ migrationVersion: 0 })
actualVersion = 0
}
return getMigrationScripts().then(migrationScripts => ({ actualVersion, migrationScripts }))
})
.then(({ actualVersion, migrationScripts }) => {
return Promise.each(migrationScripts, entity => executeMigration(actualVersion, entity))
})
.then(() => {
logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION)
})
.catch(err => {
if (err === null) return undefined
// No need migrations, abort
if (actualVersion >= LAST_MIGRATION_VERSION) return
throw err
})
// If there are a new migration scripts
logger.info('Begin migrations.')
return p
const migrationScripts = await getMigrationScripts()
for (const migrationScript of migrationScripts) {
await executeMigration(actualVersion, migrationScript)
}
logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION)
}
// ---------------------------------------------------------------------------
@ -57,29 +40,28 @@ export {
// ---------------------------------------------------------------------------
function getMigrationScripts () {
return readdirPromise(path.join(__dirname, 'migrations')).then(files => {
const filesToMigrate: {
version: string,
script: string
}[] = []
async function getMigrationScripts () {
const files = await readdirPromise(path.join(__dirname, 'migrations'))
const filesToMigrate: {
version: string,
script: string
}[] = []
files
.filter(file => file.endsWith('.js.map') === false)
.forEach(file => {
// Filename is something like 'version-blabla.js'
const version = file.split('-')[0]
filesToMigrate.push({
version,
script: file
})
files
.filter(file => file.endsWith('.js.map') === false)
.forEach(file => {
// Filename is something like 'version-blabla.js'
const version = file.split('-')[0]
filesToMigrate.push({
version,
script: file
})
})
return filesToMigrate
})
return filesToMigrate
}
function executeMigration (actualVersion: number, entity: { version: string, script: string }) {
async function executeMigration (actualVersion: number, entity: { version: string, script: string }) {
const versionScript = parseInt(entity.version, 10)
// Do not execute old migration scripts
@ -91,7 +73,7 @@ function executeMigration (actualVersion: number, entity: { version: string, scr
const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName))
return db.sequelize.transaction(t => {
await db.sequelize.transaction(async t => {
const options = {
transaction: t,
queryInterface: db.sequelize.getQueryInterface(),
@ -99,10 +81,9 @@ function executeMigration (actualVersion: number, entity: { version: string, scr
db
}
return migrationScript.up(options)
.then(() => {
// Update the new migration version
return db.Application.updateMigrationVersion(versionScript, t)
})
await migrationScript.up(options)
// Update the new migration version
await db.Application.updateMigrationVersion(versionScript, t)
})
}

View File

@ -1,7 +1,6 @@
import * as asyncLRU from 'async-lru'
import { join } from 'path'
import { createWriteStream } from 'fs'
import * as Promise from 'bluebird'
import { database as db, CONFIG, CACHE } from '../../initializers'
import { logger, unlinkPromise } from '../../helpers'
@ -43,15 +42,15 @@ class VideosPreviewCache {
})
}
private loadPreviews (key: string) {
return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key)
.then(video => {
if (!video) return undefined
private async loadPreviews (key: string) {
const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key)
if (!video) return undefined
if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName())
if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName())
return this.saveRemotePreviewAndReturnPath(video)
})
const res = await this.saveRemotePreviewAndReturnPath(video)
return res
}
private saveRemotePreviewAndReturnPath (video: VideoInstance) {

View File

@ -1,6 +1,6 @@
import * as request from 'request'
import * as Sequelize from 'sequelize'
import * as Promise from 'bluebird'
import * as Bluebird from 'bluebird'
import { join } from 'path'
import { database as db } from '../initializers/database'
@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.
function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
const tasks = []
eventsParams.forEach(eventParams => {
for (const eventParams of eventsParams) {
tasks.push(addEventToRemoteVideo(eventParams, transaction))
})
}
return Promise.all(tasks)
}
function hasFriends () {
return db.Pod.countAll().then(count => count !== 0)
async function hasFriends () {
const count = await db.Pod.countAll()
return count !== 0
}
function makeFriends (hosts: string[]) {
async function makeFriends (hosts: string[]) {
const podsScore = {}
logger.info('Make friends!')
return getMyPublicCert()
.then(cert => {
return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
})
.then(cert => {
logger.debug('Pods scores computed.', { podsScore: podsScore })
const podsList = computeWinningPods(hosts, podsScore)
logger.debug('Pods that we keep.', { podsToKeep: podsList })
const cert = await getMyPublicCert()
return makeRequestsToWinningPods(cert, podsList)
})
for (const host of hosts) {
await computeForeignPodsList(host, podsScore)
}
logger.debug('Pods scores computed.', { podsScore: podsScore })
const podsList = computeWinningPods(hosts, podsScore)
logger.debug('Pods that we keep.', { podsToKeep: podsList })
return makeRequestsToWinningPods(cert, podsList)
}
function quitFriends () {
async function quitFriends () {
// Stop pool requests
requestScheduler.deactivate()
return requestScheduler.flush()
.then(() => {
return requestVideoQaduScheduler.flush()
})
.then(() => {
return db.Pod.list()
})
.then(pods => {
const requestParams = {
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/pods/remove',
toPod: null
}
try {
await requestScheduler.flush()
// Announce we quit them
// We don't care if the request fails
// The other pod will exclude us automatically after a while
return Promise.map(pods, pod => {
await requestVideoQaduScheduler.flush()
const pods = await db.Pod.list()
const requestParams = {
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/pods/remove',
toPod: null
}
// Announce we quit them
// We don't care if the request fails
// The other pod will exclude us automatically after a while
try {
await Bluebird.map(pods, pod => {
requestParams.toPod = pod
return makeSecureRequest(requestParams)
}, { concurrency: REQUESTS_IN_PARALLEL })
.then(() => pods)
.catch(err => {
logger.error('Some errors while quitting friends.', err)
// Don't stop the process
return pods
})
})
.then(pods => {
const tasks = []
pods.forEach(pod => tasks.push(pod.destroy()))
} catch (err) { // Don't stop the process
logger.error('Some errors while quitting friends.', err)
}
return Promise.all(pods)
})
.then(() => {
logger.info('Removed all remote videos.')
// Don't forget to re activate the scheduler, even if there was an error
return requestScheduler.activate()
})
.finally(() => requestScheduler.activate())
const tasks = []
for (const pod of pods) {
tasks.push(pod.destroy())
}
await Promise.all(pods)
logger.info('Removed all remote videos.')
requestScheduler.activate()
} catch (err) {
// Don't forget to re activate the scheduler, even if there was an error
requestScheduler.activate()
throw err
}
}
function sendOwnedDataToPod (podId: number) {
async function sendOwnedDataToPod (podId: number) {
// First send authors
return sendOwnedAuthorsToPod(podId)
.then(() => sendOwnedChannelsToPod(podId))
.then(() => sendOwnedVideosToPod(podId))
await sendOwnedAuthorsToPod(podId)
await sendOwnedChannelsToPod(podId)
await sendOwnedVideosToPod(podId)
}
function sendOwnedChannelsToPod (podId: number) {
return db.VideoChannel.listOwned()
.then(videoChannels => {
const tasks = []
videoChannels.forEach(videoChannel => {
const remoteVideoChannel = videoChannel.toAddRemoteJSON()
async function sendOwnedChannelsToPod (podId: number) {
const videoChannels = await db.VideoChannel.listOwned()
const tasks: Promise<any>[] = []
for (const videoChannel of videoChannels) {
const remoteVideoChannel = videoChannel.toAddRemoteJSON()
const options = {
type: 'add-channel' as 'add-channel',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
data: remoteVideoChannel,
toIds: [ podId ],
transaction: null
}
const p = createRequest(options)
tasks.push(p)
}
await Promise.all(tasks)
}
async function sendOwnedAuthorsToPod (podId: number) {
const authors = await db.Author.listOwned()
const tasks: Promise<any>[] = []
for (const author of authors) {
const remoteAuthor = author.toAddRemoteJSON()
const options = {
type: 'add-author' as 'add-author',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
data: remoteAuthor,
toIds: [ podId ],
transaction: null
}
const p = createRequest(options)
tasks.push(p)
}
await Promise.all(tasks)
}
async function sendOwnedVideosToPod (podId: number) {
const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
const tasks: Bluebird<any>[] = []
for (const video of videosList) {
const promise = video.toAddRemoteJSON()
.then(remoteVideo => {
const options = {
type: 'add-channel' as 'add-channel',
type: 'add-video' as 'add-video',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
data: remoteVideoChannel,
data: remoteVideo,
toIds: [ podId ],
transaction: null
}
const p = createRequest(options)
tasks.push(p)
return createRequest(options)
})
.catch(err => {
logger.error('Cannot convert video to remote.', err)
// Don't break the process
return undefined
})
return Promise.all(tasks)
})
}
tasks.push(promise)
}
function sendOwnedAuthorsToPod (podId: number) {
return db.Author.listOwned()
.then(authors => {
const tasks = []
authors.forEach(author => {
const remoteAuthor = author.toAddRemoteJSON()
const options = {
type: 'add-author' as 'add-author',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
data: remoteAuthor,
toIds: [ podId ],
transaction: null
}
const p = createRequest(options)
tasks.push(p)
})
return Promise.all(tasks)
})
}
function sendOwnedVideosToPod (podId: number) {
return db.Video.listOwnedAndPopulateAuthorAndTags()
.then(videosList => {
const tasks = []
videosList.forEach(video => {
const promise = video.toAddRemoteJSON()
.then(remoteVideo => {
const options = {
type: 'add-video' as 'add-video',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
data: remoteVideo,
toIds: [ podId ],
transaction: null
}
return createRequest(options)
})
.catch(err => {
logger.error('Cannot convert video to remote.', err)
// Don't break the process
return undefined
})
tasks.push(promise)
})
return Promise.all(tasks)
})
await Promise.all(tasks)
}
function fetchRemotePreview (video: VideoInstance) {
@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) {
return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
}
function removeFriend (pod: PodInstance) {
async function removeFriend (pod: PodInstance) {
const requestParams = {
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/pods/remove',
toPod: pod
}
return makeSecureRequest(requestParams)
.catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err))
.then(() => pod.destroy())
.then(() => logger.info('Removed friend %s.', pod.host))
.catch(err => logger.error('Cannot destroy friend %s.', pod.host, err))
try {
await makeSecureRequest(requestParams)
} catch (err) {
logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
}
try {
await pod.destroy()
logger.info('Removed friend %s.', pod.host)
} catch (err) {
logger.error('Cannot destroy friend %s.', pod.host, err)
}
}
function getRequestScheduler () {
@ -406,23 +413,21 @@ export {
// ---------------------------------------------------------------------------
function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
// TODO: type res
return getForeignPodsList(host).then(res => {
const foreignPodsList: { host: string }[] = res.data
async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
const result = await getForeignPodsList(host)
const foreignPodsList: { host: string }[] = result.data
// Let's give 1 point to the pod we ask the friends list
foreignPodsList.push({ host })
// Let's give 1 point to the pod we ask the friends list
foreignPodsList.push({ host })
foreignPodsList.forEach(foreignPod => {
const foreignPodHost = foreignPod.host
for (const foreignPod of foreignPodsList) {
const foreignPodHost = foreignPod.host
if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
else podsScore[foreignPodHost] = 1
})
if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
else podsScore[foreignPodHost] = 1
}
return undefined
})
return undefined
}
function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
const podsList = []
const baseScore = hosts.length / 2
Object.keys(podsScore).forEach(podHost => {
for (const podHost of Object.keys(podsScore)) {
// If the pod is not me and with a good score we add it
if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
podsList.push({ host: podHost })
}
})
}
return podsList
}
@ -449,7 +454,7 @@ function getForeignPodsList (host: string) {
if (err) return rej(err)
try {
const json = JSON.parse(body)
const json: ResultList<FormattedPod> = JSON.parse(body)
return res(json)
} catch (err) {
return rej(err)
@ -458,53 +463,53 @@ function getForeignPodsList (host: string) {
})
}
function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
// Stop pool requests
requestScheduler.deactivate()
// Flush pool requests
requestScheduler.forceSend()
return Promise.map(podsList, pod => {
const params = {
url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
method: 'POST' as 'POST',
json: {
host: CONFIG.WEBSERVER.HOST,
email: CONFIG.ADMIN.EMAIL,
publicKey: cert
}
}
return makeRetryRequest(params)
.then(({ response, body }) => {
body = body as { cert: string, email: string }
if (response.statusCode === 200) {
const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
return podObj.save()
.then(podCreated => {
// Add our videos to the request scheduler
sendOwnedDataToPod(podCreated.id)
})
.catch(err => {
logger.error('Cannot add friend %s pod.', pod.host, err)
})
} else {
logger.error('Status not 200 for %s pod.', pod.host)
try {
await Bluebird.map(podsList, async pod => {
const params = {
url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
method: 'POST' as 'POST',
json: {
host: CONFIG.WEBSERVER.HOST,
email: CONFIG.ADMIN.EMAIL,
publicKey: cert
}
})
.catch(err => {
logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
// Don't break the process
})
}, { concurrency: REQUESTS_IN_PARALLEL })
.then(() => logger.debug('makeRequestsToWinningPods finished.'))
.finally(() => {
}
const { response, body } = await makeRetryRequest(params)
const typedBody = body as { cert: string, email: string }
if (response.statusCode === 200) {
const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
let podCreated: PodInstance
try {
podCreated = await podObj.save()
} catch (err) {
logger.error('Cannot add friend %s pod.', pod.host, err)
}
// Add our videos to the request scheduler
sendOwnedDataToPod(podCreated.id)
.catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
} else {
logger.error('Status not 200 for %s pod.', pod.host)
}
}, { concurrency: REQUESTS_IN_PARALLEL })
logger.debug('makeRequestsToWinningPods finished.')
requestScheduler.activate()
} catch (err) {
// Final callback, we've ended all the requests
// Now we made new friends, we can re activate the pool of requests
requestScheduler.activate()
})
}
}
// Wrapper that populate "toIds" argument with all our friends if it is not specified
@ -515,14 +520,19 @@ type CreateRequestOptions = {
toIds?: number[]
transaction: Sequelize.Transaction
}
function createRequest (options: CreateRequestOptions) {
if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
async function createRequest (options: CreateRequestOptions) {
if (options.toIds !== undefined) {
await requestScheduler.createRequest(options as RequestSchedulerOptions)
return undefined
}
// If the "toIds" pods is not specified, we send the request to all our friends
return db.Pod.listAllIds(options.transaction).then(podIds => {
const newOptions = Object.assign(options, { toIds: podIds })
return requestScheduler.createRequest(newOptions)
})
const podIds = await db.Pod.listAllIds(options.transaction)
const newOptions = Object.assign(options, { toIds: podIds })
await requestScheduler.createRequest(newOptions)
return undefined
}
function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {

View File

@ -1,4 +1,4 @@
import * as Promise from 'bluebird'
import * as Bluebird from 'bluebird'
import { database as db } from '../../../initializers/database'
import { logger, computeResolutionsToTranscode } from '../../../helpers'
@ -6,16 +6,17 @@ import { VideoInstance } from '../../../models'
import { addVideoToFriends } from '../../friends'
import { JobScheduler } from '../job-scheduler'
function process (data: { videoUUID: string }, jobId: number) {
return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => {
// No video, maybe deleted?
if (!video) {
logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
return undefined
}
async function process (data: { videoUUID: string }, jobId: number) {
const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
// No video, maybe deleted?
if (!video) {
logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
return undefined
}
return video.optimizeOriginalVideofile().then(() => video)
})
await video.optimizeOriginalVideofile()
return video
}
function onError (err: Error, jobId: number) {
@ -23,33 +24,31 @@ function onError (err: Error, jobId: number) {
return Promise.resolve()
}
function onSuccess (jobId: number, video: VideoInstance) {
async function onSuccess (jobId: number, video: VideoInstance) {
if (video === undefined) return undefined
logger.info('Job %d is a success.', jobId)
video.toAddRemoteJSON()
.then(remoteVideo => {
// Now we'll add the video's meta data to our friends
return addVideoToFriends(remoteVideo, null)
})
.then(() => {
return video.getOriginalFileHeight()
})
.then(originalFileHeight => {
// Create transcoding jobs if there are enabled resolutions
const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
logger.info(
'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight,
{ resolutions: resolutionsEnabled }
)
const remoteVideo = await video.toAddRemoteJSON()
if (resolutionsEnabled.length === 0) return undefined
// Now we'll add the video's meta data to our friends
await addVideoToFriends(remoteVideo, null)
return db.sequelize.transaction(t => {
const tasks: Promise<any>[] = []
const originalFileHeight = await video.getOriginalFileHeight()
// Create transcoding jobs if there are enabled resolutions
resolutionsEnabled.forEach(resolution => {
const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
logger.info(
'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight,
{ resolutions: resolutionsEnabled }
)
if (resolutionsEnabled.length !== 0) {
try {
await db.sequelize.transaction(async t => {
const tasks: Bluebird<any>[] = []
for (const resolution of resolutionsEnabled) {
const dataInput = {
videoUUID: video.uuid,
resolution
@ -57,24 +56,19 @@ function onSuccess (jobId: number, video: VideoInstance) {
const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput)
tasks.push(p)
})
}
return Promise.all(tasks).then(() => resolutionsEnabled)
await Promise.all(tasks)
})
})
.then(resolutionsEnabled => {
if (resolutionsEnabled === undefined) {
logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
return undefined
}
logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
})
.catch((err: Error) => {
logger.debug('Cannot transcode the video.', err)
throw err
})
} catch (err) {
logger.warn('Cannot transcode the video.', err)
}
} else {
logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
return undefined
}
}
// ---------------------------------------------------------------------------

View File

@ -4,16 +4,17 @@ import { logger } from '../../../helpers'
import { VideoInstance } from '../../../models'
import { VideoResolution } from '../../../../shared'
function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => {
// No video, maybe deleted?
if (!video) {
logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
return undefined
}
async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
// No video, maybe deleted?
if (!video) {
logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
return undefined
}
return video.transcodeOriginalVideofile(data.resolution).then(() => video)
})
await video.transcodeOriginalVideofile(data.resolution)
return video
}
function onError (err: Error, jobId: number) {

View File

@ -23,7 +23,7 @@ class JobScheduler {
return this.instance || (this.instance = new this())
}
activate () {
async activate () {
const limit = JOBS_FETCH_LIMIT_PER_CYCLE
logger.info('Jobs scheduler activated.')
@ -32,32 +32,36 @@ class JobScheduler {
// Finish processing jobs from a previous start
const state = JOB_STATES.PROCESSING
db.Job.listWithLimit(limit, state)
.then(jobs => {
this.enqueueJobs(jobsQueue, jobs)
try {
const jobs = await db.Job.listWithLimit(limit, state)
forever(
next => {
if (jobsQueue.length() !== 0) {
// Finish processing the queue first
return setTimeout(next, JOBS_FETCHING_INTERVAL)
}
this.enqueueJobs(jobsQueue, jobs)
} catch (err) {
logger.error('Cannot list pending jobs.', err)
}
const state = JOB_STATES.PENDING
db.Job.listWithLimit(limit, state)
.then(jobs => {
this.enqueueJobs(jobsQueue, jobs)
forever(
async next => {
if (jobsQueue.length() !== 0) {
// Finish processing the queue first
return setTimeout(next, JOBS_FETCHING_INTERVAL)
}
// Optimization: we could use "drain" from queue object
return setTimeout(next, JOBS_FETCHING_INTERVAL)
})
.catch(err => logger.error('Cannot list pending jobs.', err))
},
const state = JOB_STATES.PENDING
try {
const jobs = await db.Job.listWithLimit(limit, state)
err => logger.error('Error in job scheduler queue.', err)
)
})
.catch(err => logger.error('Cannot list pending jobs.', err))
this.enqueueJobs(jobsQueue, jobs)
} catch (err) {
logger.error('Cannot list pending jobs.', err)
}
// Optimization: we could use "drain" from queue object
return setTimeout(next, JOBS_FETCHING_INTERVAL)
},
err => logger.error('Error in job scheduler queue.', err)
)
}
createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
@ -75,7 +79,7 @@ class JobScheduler {
jobs.forEach(job => jobsQueue.push(job))
}
private processJob (job: JobInstance, callback: (err: Error) => void) {
private async processJob (job: JobInstance, callback: (err: Error) => void) {
const jobHandler = jobHandlers[job.handlerName]
if (jobHandler === undefined) {
logger.error('Unknown job handler for job %s.', job.handlerName)
@ -85,41 +89,45 @@ class JobScheduler {
logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
job.state = JOB_STATES.PROCESSING
return job.save()
.then(() => {
return jobHandler.process(job.handlerInputData, job.id)
})
.then(
result => {
return this.onJobSuccess(jobHandler, job, result)
},
await job.save()
err => {
logger.error('Error in job handler %s.', job.handlerName, err)
return this.onJobError(jobHandler, job, err)
}
)
.then(() => callback(null))
.catch(err => {
this.cannotSaveJobError(err)
return callback(err)
})
try {
const result = await jobHandler.process(job.handlerInputData, job.id)
await this.onJobSuccess(jobHandler, job, result)
} catch (err) {
logger.error('Error in job handler %s.', job.handlerName, err)
try {
await this.onJobError(jobHandler, job, err)
} catch (innerErr) {
this.cannotSaveJobError(innerErr)
return callback(innerErr)
}
}
callback(null)
}
private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
job.state = JOB_STATES.ERROR
return job.save()
.then(() => jobHandler.onError(err, job.id))
.catch(err => this.cannotSaveJobError(err))
try {
await job.save()
await jobHandler.onError(err, job.id)
} catch (err) {
this.cannotSaveJobError(err)
}
}
private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
job.state = JOB_STATES.SUCCESS
return job.save()
.then(() => jobHandler.onSuccess(job.id, jobResult))
.catch(err => this.cannotSaveJobError(err))
try {
await job.save()
jobHandler.onSuccess(job.id, jobResult)
} catch (err) {
this.cannotSaveJobError(err)
}
}
private cannotSaveJobError (err: Error) {

View File

@ -24,39 +24,36 @@ function getRefreshToken (refreshToken: string) {
return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken)
}
function getUser (username: string, password: string) {
async function getUser (username: string, password: string) {
logger.debug('Getting User (username: ' + username + ', password: ******).')
return db.User.getByUsername(username).then(user => {
if (!user) return null
const user = await db.User.getByUsername(username)
if (!user) return null
return user.isPasswordMatch(password).then(passwordMatch => {
if (passwordMatch === false) return null
const passwordMatch = await user.isPasswordMatch(password)
if (passwordMatch === false) return null
return user
})
})
return user
}
function revokeToken (token: TokenInfo) {
return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => {
if (tokenDB) tokenDB.destroy()
async function revokeToken (tokenInfo: TokenInfo) {
const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken)
if (token) token.destroy()
/*
* Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js
* "As per the discussion we need set older date
* revokeToken will expected return a boolean in future version
* https://github.com/oauthjs/node-oauth2-server/pull/274
* https://github.com/oauthjs/node-oauth2-server/issues/290"
*/
const expiredToken = tokenDB
expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z')
/*
* Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js
* "As per the discussion we need set older date
* revokeToken will expected return a boolean in future version
* https://github.com/oauthjs/node-oauth2-server/pull/274
* https://github.com/oauthjs/node-oauth2-server/issues/290"
*/
const expiredToken = token
expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z')
return expiredToken
})
return expiredToken
}
function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) {
async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) {
logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.')
const tokenToCreate = {
@ -68,11 +65,10 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns
userId: user.id
}
return db.OAuthToken.create(tokenToCreate).then(tokenCreated => {
const tokenToReturn = Object.assign(tokenCreated, { client, user })
const tokenCreated = await db.OAuthToken.create(tokenToCreate)
const tokenToReturn = Object.assign(tokenCreated, { client, user })
return tokenToReturn
})
return tokenToReturn
}
// ---------------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
import { isEmpty } from 'lodash'
import * as Promise from 'bluebird'
import * as Bluebird from 'bluebird'
import { database as db } from '../../initializers/database'
import { logger, makeSecureRequest } from '../../helpers'
@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> {
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
const params = {
toPod: toPod,
method: 'POST' as 'POST',
@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> {
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
return makeSecureRequest(params)
.then(({ response, body }) => {
if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
throw new Error('Status code not 20x : ' + response.statusCode)
}
})
.catch(err => {
logger.error('Error sending secure request to %s pod.', toPod.host, err)
try {
const { response } = await makeSecureRequest(params)
if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
throw new Error('Status code not 20x : ' + response.statusCode)
}
} catch (err) {
logger.error('Error sending secure request to %s pod.', toPod.host, err)
throw err
})
throw err
}
}
// Make all the requests of the scheduler
protected makeRequests () {
return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
.then((requestsGrouped: T) => {
// We want to group requests by destinations pod and endpoint
const requestsToMake = this.buildRequestsObjects(requestsGrouped)
protected async makeRequests () {
let requestsGrouped: T
// If there are no requests, abort
if (isEmpty(requestsToMake) === true) {
logger.info('No "%s" to make.', this.description)
return { goodPods: [], badPods: [] }
}
try {
requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
} catch (err) {
logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })
throw err
}
logger.info('Making "%s" to friends.', this.description)
// We want to group requests by destinations pod and endpoint
const requestsToMake = this.buildRequestsObjects(requestsGrouped)
const goodPods: number[] = []
const badPods: number[] = []
// If there are no requests, abort
if (isEmpty(requestsToMake) === true) {
logger.info('No "%s" to make.', this.description)
return { goodPods: [], badPods: [] }
}
return Promise.map(Object.keys(requestsToMake), hashKey => {
const requestToMake = requestsToMake[hashKey]
const toPod: PodInstance = requestToMake.toPod
logger.info('Making "%s" to friends.', this.description)
return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
.then(() => {
logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
goodPods.push(requestToMake.toPod.id)
const goodPods: number[] = []
const badPods: number[] = []
this.afterRequestHook()
await Bluebird.map(Object.keys(requestsToMake), async hashKey => {
const requestToMake = requestsToMake[hashKey]
const toPod: PodInstance = requestToMake.toPod
// Remove the pod id of these request ids
return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
})
.catch(err => {
badPods.push(requestToMake.toPod.id)
logger.info('Cannot make request to %s.', toPod.host, err)
})
}, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
})
.then(({ goodPods, badPods }) => {
this.afterRequestsHook()
try {
await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
goodPods.push(requestToMake.toPod.id)
// All the requests were made, we update the pods score
return db.Pod.updatePodsScore(goodPods, badPods)
})
.catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
this.afterRequestHook()
// Remove the pod id of these request ids
await this.getRequestToPodModel()
.removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
} catch (err) {
badPods.push(requestToMake.toPod.id)
logger.info('Cannot make request to %s.', toPod.host, err)
}
}, { concurrency: REQUESTS_IN_PARALLEL })
this.afterRequestsHook()
// All the requests were made, we update the pods score
await db.Pod.updatePodsScore(goodPods, badPods)
}
protected afterRequestHook () {
// Nothing to do, let children reimplement it
// Nothing to do, let children re-implement it
}
protected afterRequestsHook () {
// Nothing to do, let children reimplement it
// Nothing to do, let children re-implement it
}
}

View File

@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
buildRequestsObjects (requestsGrouped: RequestsGrouped) {
const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {}
Object.keys(requestsGrouped).forEach(toPodId => {
requestsGrouped[toPodId].forEach(data => {
for (const toPodId of Object.keys(requestsGrouped)) {
for (const data of requestsGrouped[toPodId]) {
const request = data.request
const pod = data.pod
const hashKey = toPodId + request.endpoint
@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
requestsToMakeGrouped[hashKey].ids.push(request.id)
requestsToMakeGrouped[hashKey].datas.push(request.request)
})
})
}
}
return requestsToMakeGrouped
}
createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
// If there are no destination pods abort
if (toIds.length === 0) return undefined
@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
transaction
}
return db.Request.create(createQuery, dbRequestOptions)
.then(request => {
return request.setPods(toIds, dbRequestOptions)
})
const request = await db.Request.create(createQuery, dbRequestOptions)
await request.setPods(toIds, dbRequestOptions)
}
// ---------------------------------------------------------------------------

View File

@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
// We group video events per video and per pod
// We add the counts of the same event types
Object.keys(eventRequests).forEach(toPodId => {
eventRequests[toPodId].forEach(eventToProcess => {
for (const toPodId of Object.keys(eventRequests)) {
for (const eventToProcess of eventRequests[toPodId]) {
if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
if (!requestsToMakeGrouped[toPodId]) {
@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
if (!events[eventToProcess.type]) events[eventToProcess.type] = 0
events[eventToProcess.type] += eventToProcess.count
})
})
}
}
// Now we build our requests array per pod
Object.keys(eventsPerVideoPerPod).forEach(toPodId => {
for (const toPodId of Object.keys(eventsPerVideoPerPod)) {
const eventsForPod = eventsPerVideoPerPod[toPodId]
Object.keys(eventsForPod).forEach(uuid => {
for (const uuid of Object.keys(eventsForPod)) {
const eventsForVideo = eventsForPod[uuid]
Object.keys(eventsForVideo).forEach(eventType => {
for (const eventType of Object.keys(eventsForVideo)) {
requestsToMakeGrouped[toPodId].datas.push({
data: {
uuid,
@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
count: +eventsForVideo[eventType]
}
})
})
})
})
}
}
}
return requestsToMakeGrouped
}

View File

@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
buildRequestsObjects (requests: RequestsVideoQaduGrouped) {
const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {}
Object.keys(requests).forEach(toPodId => {
requests[toPodId].forEach(data => {
for (const toPodId of Object.keys(requests)) {
for (const data of requests[toPodId]) {
const request = data.request
const video = data.video
const pod = data.pod
@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
// Maybe there are multiple quick and dirty update for the same video
// We use this hash map to dedupe them
requestsToMakeGrouped[hashKey].videos[video.id] = videoData
})
})
}
}
// Now we deduped similar quick and dirty updates, we can build our requests data
Object.keys(requestsToMakeGrouped).forEach(hashKey => {
Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => {
for (const hashKey of Object.keys(requestsToMakeGrouped)) {
for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) {
const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID]
requestsToMakeGrouped[hashKey].datas.push({
data: videoData
})
})
}
// We don't need it anymore, it was just to build our data array
delete requestsToMakeGrouped[hashKey].videos
})
}
return requestsToMakeGrouped
}
createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
const dbRequestOptions: Sequelize.BulkCreateOptions = {}
if (transaction) dbRequestOptions.transaction = transaction
// Send the update to all our friends
return db.Pod.listAllIds(transaction).then(podIds => {
const queries = []
podIds.forEach(podId => {
queries.push({ type, videoId, podId })
})
const podIds = await db.Pod.listAllIds(transaction)
const queries = []
for (const podId of podIds) {
queries.push({ type, videoId, podId })
}
return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
})
await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
return undefined
}
}

View File

@ -3,40 +3,36 @@ import { UserInstance } from '../models'
import { addVideoAuthorToFriends } from './friends'
import { createVideoChannel } from './video-channel'
function createUserAuthorAndChannel (user: UserInstance, validateUser = true) {
return db.sequelize.transaction(t => {
async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) {
const res = await db.sequelize.transaction(async t => {
const userOptions = {
transaction: t,
validate: validateUser
}
return user.save(userOptions)
.then(user => {
const author = db.Author.build({
name: user.username,
podId: null, // It is our pod
userId: user.id
})
const userCreated = await user.save(userOptions)
const authorInstance = db.Author.build({
name: userCreated.username,
podId: null, // It is our pod
userId: userCreated.id
})
return author.save({ transaction: t })
.then(author => ({ author, user }))
})
.then(({ author, user }) => {
const remoteVideoAuthor = author.toAddRemoteJSON()
const authorCreated = await authorInstance.save({ transaction: t })
// Now we'll add the video channel's meta data to our friends
return addVideoAuthorToFriends(remoteVideoAuthor, t)
.then(() => ({ author, user }))
})
.then(({ author, user }) => {
const videoChannelInfo = {
name: `Default ${user.username} channel`
}
const remoteVideoAuthor = authorCreated.toAddRemoteJSON()
return createVideoChannel(videoChannelInfo, author, t)
.then(videoChannel => ({ author, user, videoChannel }))
})
// Now we'll add the video channel's meta data to our friends
const author = await addVideoAuthorToFriends(remoteVideoAuthor, t)
const videoChannelInfo = {
name: `Default ${userCreated.username} channel`
}
const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t)
return { author, videoChannel }
})
return res
}
// ---------------------------------------------------------------------------

View File

@ -8,45 +8,44 @@ import {
} from '../helpers'
import { PodSignature } from '../../shared'
function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) {
async function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) {
const signatureObject: PodSignature = req.body.signature
const host = signatureObject.host
db.Pod.loadByHost(host)
.then(pod => {
if (pod === null) {
logger.error('Unknown pod %s.', host)
return res.sendStatus(403)
}
logger.debug('Checking signature from %s.', host)
let signatureShouldBe
// If there is data in the body the sender used it for its signature
// If there is no data we just use its host as signature
if (req.body.data) {
signatureShouldBe = req.body.data
} else {
signatureShouldBe = host
}
const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature)
if (signatureOk === true) {
res.locals.secure = {
pod
}
return next()
}
logger.error('Signature is not okay in body for %s.', signatureObject.host)
try {
const pod = await db.Pod.loadByHost(host)
if (pod === null) {
logger.error('Unknown pod %s.', host)
return res.sendStatus(403)
})
.catch(err => {
logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature })
return res.sendStatus(500)
})
}
logger.debug('Checking signature from %s.', host)
let signatureShouldBe
// If there is data in the body the sender used it for its signature
// If there is no data we just use its host as signature
if (req.body.data) {
signatureShouldBe = req.body.data
} else {
signatureShouldBe = host
}
const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature)
if (signatureOk === true) {
res.locals.secure = {
pod
}
return next()
}
logger.error('Signature is not okay in body for %s.', signatureObject.host)
return res.sendStatus(403)
} catch (err) {
logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature })
return res.sendStatus(500)
}
}
// ---------------------------------------------------------------------------

View File

@ -247,20 +247,21 @@ updatePodsScore = function (goodPods: number[], badPods: number[]) {
// ---------------------------------------------------------------------------
// Remove pods with a score of 0 (too many requests where they were unreachable)
function removeBadPods () {
return listBadPods()
.then(pods => {
const podsRemovePromises = pods.map(pod => pod.destroy())
return Promise.all(podsRemovePromises).then(() => pods.length)
})
.then(numberOfPodsRemoved => {
if (numberOfPodsRemoved) {
logger.info('Removed %d pods.', numberOfPodsRemoved)
} else {
logger.info('No need to remove bad pods.')
}
})
.catch(err => {
logger.error('Cannot remove bad pods.', err)
})
async function removeBadPods () {
try {
const pods = await listBadPods()
const podsRemovePromises = pods.map(pod => pod.destroy())
await Promise.all(podsRemovePromises)
const numberOfPodsRemoved = pods.length
if (numberOfPodsRemoved) {
logger.info('Removed %d pods.', numberOfPodsRemoved)
} else {
logger.info('No need to remove bad pods.')
}
} catch (err) {
logger.error('Cannot remove bad pods.', err)
}
}