Improve redundancy: add 'min_lifetime' configuration

This commit is contained in:
Chocobozzz 2018-09-24 13:07:33 +02:00
parent d1a63fc7ac
commit e5565833f6
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
27 changed files with 644 additions and 295 deletions

View File

@ -75,14 +75,20 @@ redundancy:
strategies: strategies:
# - # -
# size: '10GB' # size: '10GB'
# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances)
# min_lifetime: '48 hours'
# strategy: 'most-views' # Cache videos that have the most views # strategy: 'most-views' # Cache videos that have the most views
# - # -
# size: '10GB' # size: '10GB'
# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances)
# min_lifetime: '48 hours'
# strategy: 'trending' # Cache trending videos # strategy: 'trending' # Cache trending videos
# - # -
# size: '10GB' # size: '10GB'
# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances)
# min_lifetime: '48 hours'
# strategy: 'recently-added' # Cache recently added videos # strategy: 'recently-added' # Cache recently added videos
# minViews: 10 # Having at least x views # min_views: 10 # Having at least x views
cache: cache:
previews: previews:

View File

@ -76,14 +76,20 @@ redundancy:
strategies: strategies:
# - # -
# size: '10GB' # size: '10GB'
# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances)
# min_lifetime: '48 hours'
# strategy: 'most-views' # Cache videos that have the most views # strategy: 'most-views' # Cache videos that have the most views
# - # -
# size: '10GB' # size: '10GB'
# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances)
# min_lifetime: '48 hours'
# strategy: 'trending' # Cache trending videos # strategy: 'trending' # Cache trending videos
# - # -
# size: '10GB' # size: '10GB'
# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances)
# min_lifetime: '48 hours'
# strategy: 'recently-added' # Cache recently added videos # strategy: 'recently-added' # Cache recently added videos
# minViews: 10 # Having at least x views # min_views: 10 # Having at least x views
############################################################################### ###############################################################################
# #

View File

@ -23,18 +23,21 @@ log:
redundancy: redundancy:
videos: videos:
check_interval: '5 seconds' check_interval: '10 minutes'
strategies: strategies:
- -
size: '10MB' size: '10MB'
min_lifetime: '10 minutes'
strategy: 'most-views' strategy: 'most-views'
- -
size: '10MB' size: '10MB'
min_lifetime: '10 minutes'
strategy: 'trending' strategy: 'trending'
- -
size: '10MB' size: '10MB'
min_lifetime: '10 minutes'
strategy: 'recently-added' strategy: 'recently-added'
minViews: 1 min_views: 1
cache: cache:
previews: previews:

View File

@ -1,6 +1,4 @@
// FIXME: https://github.com/nodejs/node/pull/16853 // FIXME: https://github.com/nodejs/node/pull/16853
import { VideosCaptionCache } from './server/lib/cache/videos-caption-cache'
require('tls').DEFAULT_ECDH_CURVE = 'auto' require('tls').DEFAULT_ECDH_CURVE = 'auto'
import { isTestInstance } from './server/helpers/core-utils' import { isTestInstance } from './server/helpers/core-utils'
@ -17,7 +15,7 @@ import * as cors from 'cors'
import * as cookieParser from 'cookie-parser' import * as cookieParser from 'cookie-parser'
import * as helmet from 'helmet' import * as helmet from 'helmet'
import * as useragent from 'useragent' import * as useragent from 'useragent'
import * as anonymise from 'ip-anonymize' import * as anonymize from 'ip-anonymize'
process.title = 'peertube' process.title = 'peertube'
@ -25,7 +23,7 @@ process.title = 'peertube'
const app = express() const app = express()
// ----------- Core checker ----------- // ----------- Core checker -----------
import { checkMissedConfig, checkFFmpeg, checkConfig, checkActivityPubUrls } from './server/initializers/checker' import { checkMissedConfig, checkFFmpeg } from './server/initializers/checker-before-init'
// Do not use barrels because we don't want to load all modules here (we need to initialize database first) // Do not use barrels because we don't want to load all modules here (we need to initialize database first)
import { logger } from './server/helpers/logger' import { logger } from './server/helpers/logger'
@ -43,6 +41,8 @@ checkFFmpeg(CONFIG)
process.exit(-1) process.exit(-1)
}) })
import { checkConfig, checkActivityPubUrls } from './server/initializers/checker-after-init'
const errorMessage = checkConfig() const errorMessage = checkConfig()
if (errorMessage !== null) { if (errorMessage !== null) {
throw new Error(errorMessage) throw new Error(errorMessage)
@ -76,7 +76,7 @@ migrate()
import { installApplication } from './server/initializers' import { installApplication } from './server/initializers'
import { Emailer } from './server/lib/emailer' import { Emailer } from './server/lib/emailer'
import { JobQueue } from './server/lib/job-queue' import { JobQueue } from './server/lib/job-queue'
import { VideosPreviewCache } from './server/lib/cache' import { VideosPreviewCache, VideosCaptionCache } from './server/lib/cache'
import { import {
activityPubRouter, activityPubRouter,
apiRouter, apiRouter,
@ -111,7 +111,7 @@ if (isTestInstance()) {
// For the logger // For the logger
morgan.token('remote-addr', req => { morgan.token('remote-addr', req => {
return (req.get('DNT') === '1') ? return (req.get('DNT') === '1') ?
anonymise(req.ip || (req.connection && req.connection.remoteAddress) || undefined, anonymize(req.ip || (req.connection && req.connection.remoteAddress) || undefined,
16, // bitmask for IPv4 16, // bitmask for IPv4
16 // bitmask for IPv6 16 // bitmask for IPv6
) : ) :

View File

@ -4,7 +4,7 @@ import { VideoResolution } from '../../shared/models/videos'
import { CONFIG, FFMPEG_NICE, VIDEO_TRANSCODING_FPS } from '../initializers' import { CONFIG, FFMPEG_NICE, VIDEO_TRANSCODING_FPS } from '../initializers'
import { processImage } from './image-utils' import { processImage } from './image-utils'
import { logger } from './logger' import { logger } from './logger'
import { checkFFmpegEncoders } from '../initializers/checker' import { checkFFmpegEncoders } from '../initializers/checker-before-init'
import { remove } from 'fs-extra' import { remove } from 'fs-extra'
function computeResolutionsToTranscode (videoFileHeight: number) { function computeResolutionsToTranscode (videoFileHeight: number) {

View File

@ -0,0 +1,115 @@
import * as config from 'config'
import { promisify0, isProdInstance, parseDuration, isTestInstance } from '../helpers/core-utils'
import { UserModel } from '../models/account/user'
import { ApplicationModel } from '../models/application/application'
import { OAuthClientModel } from '../models/oauth/oauth-client'
import { parse } from 'url'
import { CONFIG } from './constants'
import { logger } from '../helpers/logger'
import { getServerActor } from '../helpers/utils'
import { RecentlyAddedStrategy, VideosRedundancy } from '../../shared/models/redundancy'
import { isArray } from '../helpers/custom-validators/misc'
import { uniq } from 'lodash'
async function checkActivityPubUrls () {
const actor = await getServerActor()
const parsed = parse(actor.url)
if (CONFIG.WEBSERVER.HOST !== parsed.host) {
const NODE_ENV = config.util.getEnv('NODE_ENV')
const NODE_CONFIG_DIR = config.util.getEnv('NODE_CONFIG_DIR')
logger.warn(
'It seems PeerTube was started (and created some data) with another domain name. ' +
'This means you will not be able to federate! ' +
'Please use %s %s npm run update-host to fix this.',
NODE_CONFIG_DIR ? `NODE_CONFIG_DIR=${NODE_CONFIG_DIR}` : '',
NODE_ENV ? `NODE_ENV=${NODE_ENV}` : ''
)
}
}
// Some checks on configuration files
// Return an error message, or null if everything is okay
function checkConfig () {
const defaultNSFWPolicy = CONFIG.INSTANCE.DEFAULT_NSFW_POLICY
// NSFW policy
{
const available = [ 'do_not_list', 'blur', 'display' ]
if (available.indexOf(defaultNSFWPolicy) === -1) {
return 'NSFW policy setting should be ' + available.join(' or ') + ' instead of ' + defaultNSFWPolicy
}
}
// Redundancies
const redundancyVideos = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES
if (isArray(redundancyVideos)) {
const available = [ 'most-views', 'trending', 'recently-added' ]
for (const r of redundancyVideos) {
if (available.indexOf(r.strategy) === -1) {
return 'Videos redundancy should have ' + available.join(' or ') + ' strategy instead of ' + r.strategy
}
// Lifetime should not be < 10 hours
if (!isTestInstance() && r.minLifetime < 1000 * 3600 * 10) {
return 'Video redundancy minimum lifetime should be >= 10 hours for strategy ' + r.strategy
}
}
const filtered = uniq(redundancyVideos.map(r => r.strategy))
if (filtered.length !== redundancyVideos.length) {
return 'Redundancy video entries should have unique strategies'
}
const recentlyAddedStrategy = redundancyVideos.find(r => r.strategy === 'recently-added') as RecentlyAddedStrategy
if (recentlyAddedStrategy && isNaN(recentlyAddedStrategy.minViews)) {
return 'Min views in recently added strategy is not a number'
}
}
if (isProdInstance()) {
const configStorage = config.get('storage')
for (const key of Object.keys(configStorage)) {
if (configStorage[key].startsWith('storage/')) {
logger.warn(
'Directory of %s should not be in the production directory of PeerTube. Please check your production configuration file.',
key
)
}
}
}
return null
}
// We get db by param to not import it in this file (import orders)
async function clientsExist () {
const totalClients = await OAuthClientModel.countTotal()
return totalClients !== 0
}
// We get db by param to not import it in this file (import orders)
async function usersExist () {
const totalUsers = await UserModel.countTotal()
return totalUsers !== 0
}
// We get db by param to not import it in this file (import orders)
async function applicationExist () {
const totalApplication = await ApplicationModel.countTotal()
return totalApplication !== 0
}
// ---------------------------------------------------------------------------
export {
checkConfig,
clientsExist,
usersExist,
applicationExist,
checkActivityPubUrls
}

View File

@ -1,78 +1,8 @@
import * as config from 'config' import * as config from 'config'
import { promisify0, isProdInstance } from '../helpers/core-utils' import { promisify0 } from '../helpers/core-utils'
import { UserModel } from '../models/account/user'
import { ApplicationModel } from '../models/application/application'
import { OAuthClientModel } from '../models/oauth/oauth-client'
import { parse } from 'url'
import { CONFIG } from './constants'
import { logger } from '../helpers/logger'
import { getServerActor } from '../helpers/utils'
import { RecentlyAddedStrategy, VideosRedundancy } from '../../shared/models/redundancy'
import { isArray } from '../helpers/custom-validators/misc' import { isArray } from '../helpers/custom-validators/misc'
import { uniq } from 'lodash'
async function checkActivityPubUrls () { // ONLY USE CORE MODULES IN THIS FILE!
const actor = await getServerActor()
const parsed = parse(actor.url)
if (CONFIG.WEBSERVER.HOST !== parsed.host) {
const NODE_ENV = config.util.getEnv('NODE_ENV')
const NODE_CONFIG_DIR = config.util.getEnv('NODE_CONFIG_DIR')
logger.warn(
'It seems PeerTube was started (and created some data) with another domain name. ' +
'This means you will not be able to federate! ' +
'Please use %s %s npm run update-host to fix this.',
NODE_CONFIG_DIR ? `NODE_CONFIG_DIR=${NODE_CONFIG_DIR}` : '',
NODE_ENV ? `NODE_ENV=${NODE_ENV}` : ''
)
}
}
// Some checks on configuration files
// Return an error message, or null if everything is okay
function checkConfig () {
const defaultNSFWPolicy = config.get<string>('instance.default_nsfw_policy')
// NSFW policy
if ([ 'do_not_list', 'blur', 'display' ].indexOf(defaultNSFWPolicy) === -1) {
return 'NSFW policy setting should be "do_not_list" or "blur" or "display" instead of ' + defaultNSFWPolicy
}
// Redundancies
const redundancyVideos = config.get<VideosRedundancy[]>('redundancy.videos.strategies')
if (isArray(redundancyVideos)) {
for (const r of redundancyVideos) {
if ([ 'most-views', 'trending', 'recently-added' ].indexOf(r.strategy) === -1) {
return 'Redundancy video entries should have "most-views" strategy instead of ' + r.strategy
}
}
const filtered = uniq(redundancyVideos.map(r => r.strategy))
if (filtered.length !== redundancyVideos.length) {
return 'Redundancy video entries should have unique strategies'
}
const recentlyAddedStrategy = redundancyVideos.find(r => r.strategy === 'recently-added') as RecentlyAddedStrategy
if (recentlyAddedStrategy && isNaN(recentlyAddedStrategy.minViews)) {
return 'Min views in recently added strategy is not a number'
}
}
if (isProdInstance()) {
const configStorage = config.get('storage')
for (const key of Object.keys(configStorage)) {
if (configStorage[key].startsWith('storage/')) {
logger.warn(
'Directory of %s should not be in the production directory of PeerTube. Please check your production configuration file.',
key
)
}
}
}
return null
}
// Check the config files // Check the config files
function checkMissedConfig () { function checkMissedConfig () {
@ -109,6 +39,14 @@ function checkMissedConfig () {
} }
} }
const redundancyVideos = config.get<any>('redundancy.videos.strategies')
if (isArray(redundancyVideos)) {
for (const r of redundancyVideos) {
if (!r.size) miss.push('redundancy.videos.strategies.size')
if (!r.min_lifetime) miss.push('redundancy.videos.strategies.min_lifetime')
}
}
const missingAlternatives = requiredAlternatives.filter( const missingAlternatives = requiredAlternatives.filter(
set => !set.find(alternative => !alternative.find(key => !config.has(key))) set => !set.find(alternative => !alternative.find(key => !config.has(key)))
) )
@ -163,36 +101,10 @@ async function checkFFmpegEncoders (): Promise<Map<string, boolean>> {
} }
} }
// We get db by param to not import it in this file (import orders)
async function clientsExist () {
const totalClients = await OAuthClientModel.countTotal()
return totalClients !== 0
}
// We get db by param to not import it in this file (import orders)
async function usersExist () {
const totalUsers = await UserModel.countTotal()
return totalUsers !== 0
}
// We get db by param to not import it in this file (import orders)
async function applicationExist () {
const totalApplication = await ApplicationModel.countTotal()
return totalApplication !== 0
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export { export {
checkConfig,
checkFFmpeg, checkFFmpeg,
checkFFmpegEncoders, checkFFmpegEncoders,
checkMissedConfig, checkMissedConfig
clientsExist,
usersExist,
applicationExist,
checkActivityPubUrls
} }

View File

@ -601,7 +601,6 @@ const MEMOIZE_TTL = {
const REDUNDANCY = { const REDUNDANCY = {
VIDEOS: { VIDEOS: {
EXPIRES_AFTER_MS: 48 * 3600 * 1000, // 2 days
RANDOMIZED_FACTOR: 5 RANDOMIZED_FACTOR: 5
} }
} }
@ -750,10 +749,16 @@ function updateWebserverConfig () {
CONFIG.WEBSERVER.HOST = sanitizeHost(CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT, REMOTE_SCHEME.HTTP) CONFIG.WEBSERVER.HOST = sanitizeHost(CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT, REMOTE_SCHEME.HTTP)
} }
function buildVideosRedundancy (objs: VideosRedundancy[]): VideosRedundancy[] { function buildVideosRedundancy (objs: any[]): VideosRedundancy[] {
if (!objs) return [] if (!objs) return []
return objs.map(obj => Object.assign(obj, { size: bytes.parse(obj.size) })) return objs.map(obj => {
return Object.assign(obj, {
minLifetime: parseDuration(obj.min_lifetime),
size: bytes.parse(obj.size),
minViews: obj.min_views
})
})
} }
function buildLanguages () { function buildLanguages () {

View File

@ -1,6 +1,5 @@
// Constants first, database in second! // Constants first, database in second!
export * from './constants' export * from './constants'
export * from './database' export * from './database'
export * from './checker'
export * from './installer' export * from './installer'
export * from './migrator' export * from './migrator'

View File

@ -5,7 +5,7 @@ import { createApplicationActor, createUserAccountAndChannel } from '../lib/user
import { UserModel } from '../models/account/user' import { UserModel } from '../models/account/user'
import { ApplicationModel } from '../models/application/application' import { ApplicationModel } from '../models/application/application'
import { OAuthClientModel } from '../models/oauth/oauth-client' import { OAuthClientModel } from '../models/oauth/oauth-client'
import { applicationExist, clientsExist, usersExist } from './checker' import { applicationExist, clientsExist, usersExist } from './checker-after-init'
import { CACHE, CONFIG, LAST_MIGRATION_VERSION } from './constants' import { CACHE, CONFIG, LAST_MIGRATION_VERSION } from './constants'
import { sequelizeTypescript } from './database' import { sequelizeTypescript } from './database'
import { remove, ensureDir } from 'fs-extra' import { remove, ensureDir } from 'fs-extra'

View File

@ -50,7 +50,12 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: ActorModel[]): Acti
async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) {
const actors = await VideoShareModel.loadActorsByShare(video.id, t) const actors = await VideoShareModel.loadActorsByShare(video.id, t)
actors.push(video.VideoChannel.Account.Actor)
const videoActor = video.VideoChannel && video.VideoChannel.Account
? video.VideoChannel.Account.Actor
: await ActorModel.loadAccountActorByVideoId(video.id, t)
actors.push(videoActor)
return actors return actors
} }

View File

@ -1,7 +1,7 @@
import { CacheFileObject } from '../../../shared/index' import { CacheFileObject } from '../../../shared/index'
import { VideoModel } from '../../models/video/video' import { VideoModel } from '../../models/video/video'
import { sequelizeTypescript } from '../../initializers'
import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
import { Transaction } from 'sequelize'
function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) {
const url = cacheFileObject.url const url = cacheFileObject.url
@ -22,25 +22,29 @@ function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject
} }
} }
function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }, t: Transaction) {
return sequelizeTypescript.transaction(async t => { const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
return VideoRedundancyModel.create(attributes, { transaction: t }) return VideoRedundancyModel.create(attributes, { transaction: t })
})
} }
function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: { id?: number }) { function updateCacheFile (
cacheFileObject: CacheFileObject,
redundancyModel: VideoRedundancyModel,
video: VideoModel,
byActor: { id?: number },
t: Transaction
) {
if (redundancyModel.actorId !== byActor.id) { if (redundancyModel.actorId !== byActor.id) {
throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.')
} }
const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor) const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
redundancyModel.set('expires', attributes.expiresOn) redundancyModel.set('expires', attributes.expiresOn)
redundancyModel.set('fileUrl', attributes.fileUrl) redundancyModel.set('fileUrl', attributes.fileUrl)
return redundancyModel.save() return redundancyModel.save({ transaction: t })
} }
export { export {

View File

@ -95,7 +95,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate)
if (video.isOwned()) { if (video.isOwned()) {
// Don't resend the activity to the sender // Don't resend the activity to the sender
const exceptions = [ byActor ] const exceptions = [ byActor ]
await forwardActivity(activity, undefined, exceptions) await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
} }
} }
@ -104,12 +104,14 @@ async function processCacheFile (byActor: ActorModel, activity: ActivityCreate)
const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object })
await createCacheFile(cacheFile, video, byActor) await sequelizeTypescript.transaction(async t => {
return createCacheFile(cacheFile, video, byActor, t)
})
if (video.isOwned()) { if (video.isOwned()) {
// Don't resend the activity to the sender // Don't resend the activity to the sender
const exceptions = [ byActor ] const exceptions = [ byActor ]
await forwardActivity(activity, undefined, exceptions) await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
} }
} }

View File

@ -100,7 +100,7 @@ async function processUndoCacheFile (byActor: ActorModel, activity: ActivityUndo
return sequelizeTypescript.transaction(async t => { return sequelizeTypescript.transaction(async t => {
const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id)
if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url) if (!cacheFile) throw new Error('Unknown video cache ' + cacheFileObject.id)
if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.')

View File

@ -12,6 +12,7 @@ import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-vali
import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file'
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
import { createCacheFile, updateCacheFile } from '../cache-file' import { createCacheFile, updateCacheFile } from '../cache-file'
import { forwardVideoRelatedActivity } from '../send/utils'
async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) {
const objectType = activity.object.type const objectType = activity.object.type
@ -68,18 +69,29 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate)
async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) {
const cacheFileObject = activity.object as CacheFileObject const cacheFileObject = activity.object as CacheFileObject
if (!isCacheFileObjectValid(cacheFileObject) === false) { if (!isCacheFileObjectValid(cacheFileObject)) {
logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject }) logger.debug('Cache file object sent by update is not valid.', { cacheFileObject })
return undefined return undefined
} }
const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object })
if (!redundancyModel) {
const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.id })
return createCacheFile(cacheFileObject, video, byActor)
}
return updateCacheFile(cacheFileObject, redundancyModel, byActor) await sequelizeTypescript.transaction(async t => {
const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id, t)
if (!redundancyModel) {
await createCacheFile(cacheFileObject, video, byActor, t)
} else {
await updateCacheFile(cacheFileObject, redundancyModel, video, byActor, t)
}
})
if (video.isOwned()) {
// Don't resend the activity to the sender
const exceptions = [ byActor ]
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
}
} }
async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) {

View File

@ -7,8 +7,8 @@ import { VideoModel } from '../../../models/video/video'
import { VideoChannelModel } from '../../../models/video/video-channel' import { VideoChannelModel } from '../../../models/video/video-channel'
import { VideoShareModel } from '../../../models/video/video-share' import { VideoShareModel } from '../../../models/video/video-share'
import { getUpdateActivityPubUrl } from '../url' import { getUpdateActivityPubUrl } from '../url'
import { broadcastToFollowers, sendVideoRelatedActivity, unicastTo } from './utils' import { broadcastToFollowers, sendVideoRelatedActivity } from './utils'
import { audiencify, getActorsInvolvedInVideo, getAudience, getAudienceFromFollowersOf } from '../audience' import { audiencify, getActorsInvolvedInVideo, getAudience } from '../audience'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { VideoCaptionModel } from '../../../models/video/video-caption' import { VideoCaptionModel } from '../../../models/video/video-caption'
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'

View File

@ -176,7 +176,7 @@ async function getOrCreateVideoAndAccountAndChannel (options: {
syncParam, syncParam,
refreshViews refreshViews
} }
const p = retryTransactionWrapper(refreshVideoIfNeeded, refreshOptions) const p = refreshVideoIfNeeded(refreshOptions)
if (syncParam.refreshVideo === true) videoFromDatabase = await p if (syncParam.refreshVideo === true) videoFromDatabase = await p
return { video: videoFromDatabase } return { video: videoFromDatabase }
@ -245,29 +245,37 @@ async function updateVideoFromAP (options: {
generateThumbnailFromUrl(options.video, options.videoObject.icon) generateThumbnailFromUrl(options.video, options.videoObject.icon)
.catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err }))
// Remove old video files {
const videoFileDestroyTasks: Bluebird<void>[] = [] const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject)
for (const videoFile of options.video.VideoFiles) { const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a))
videoFileDestroyTasks.push(videoFile.destroy(sequelizeOptions))
// Remove video files that do not exist anymore
const destroyTasks = options.video.VideoFiles
.filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f)))
.map(f => f.destroy(sequelizeOptions))
await Promise.all(destroyTasks)
// Update or add other one
const upsertTasks = videoFileAttributes.map(a => VideoFileModel.upsert(a, sequelizeOptions))
await Promise.all(upsertTasks)
} }
await Promise.all(videoFileDestroyTasks)
const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) {
const tasks = videoFileAttributes.map(f => VideoFileModel.create(f, sequelizeOptions)) // Update Tags
await Promise.all(tasks) const tags = options.videoObject.tag.map(tag => tag.name)
const tagInstances = await TagModel.findOrCreateTags(tags, t)
await options.video.$set('Tags', tagInstances, sequelizeOptions)
}
// Update Tags {
const tags = options.videoObject.tag.map(tag => tag.name) // Update captions
const tagInstances = await TagModel.findOrCreateTags(tags, t) await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t)
await options.video.$set('Tags', tagInstances, sequelizeOptions)
// Update captions const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => {
await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t)
})
const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { await Promise.all(videoCaptionsPromises)
return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) }
})
await Promise.all(videoCaptionsPromises)
}) })
logger.info('Remote video with uuid %s updated', options.videoObject.uuid) logger.info('Remote video with uuid %s updated', options.videoObject.uuid)
@ -382,7 +390,7 @@ async function refreshVideoIfNeeded (options: {
channel: channelActor.VideoChannel, channel: channelActor.VideoChannel,
updateViews: options.refreshViews updateViews: options.refreshViews
} }
await updateVideoFromAP(updateOptions) await retryTransactionWrapper(updateVideoFromAP, updateOptions)
await syncVideoExternalAttributes(video, videoObject, options.syncParam) await syncVideoExternalAttributes(video, videoObject, options.syncParam)
} catch (err) { } catch (err) {
logger.warn('Cannot refresh video.', { err }) logger.warn('Cannot refresh video.', { err })

View File

@ -1 +1,2 @@
export * from './videos-preview-cache' export * from './videos-preview-cache'
export * from './videos-caption-cache'

View File

@ -6,7 +6,8 @@ import { getServerActor } from '../helpers/utils'
async function removeVideoRedundancy (videoRedundancy: VideoRedundancyModel, t?: Transaction) { async function removeVideoRedundancy (videoRedundancy: VideoRedundancyModel, t?: Transaction) {
const serverActor = await getServerActor() const serverActor = await getServerActor()
await sendUndoCacheFile(serverActor, videoRedundancy, t) // Local cache, send undo to remote instances
if (videoRedundancy.actorId === serverActor.id) await sendUndoCacheFile(serverActor, videoRedundancy, t)
await videoRedundancy.destroy({ transaction: t }) await videoRedundancy.destroy({ transaction: t })
} }

View File

@ -1,7 +1,7 @@
import { AbstractScheduler } from './abstract-scheduler' import { AbstractScheduler } from './abstract-scheduler'
import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' import { CONFIG, JOB_TTL, REDUNDANCY } from '../../initializers'
import { logger } from '../../helpers/logger' import { logger } from '../../helpers/logger'
import { VideoRedundancyStrategy, VideosRedundancy } from '../../../shared/models/redundancy' import { VideosRedundancy } from '../../../shared/models/redundancy'
import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
import { VideoFileModel } from '../../models/video/video-file' import { VideoFileModel } from '../../models/video/video-file'
import { downloadWebTorrentVideo } from '../../helpers/webtorrent' import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
@ -12,6 +12,7 @@ import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
import { VideoModel } from '../../models/video/video' import { VideoModel } from '../../models/video/video'
import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' import { getVideoCacheFileActivityPubUrl } from '../activitypub/url'
import { isTestInstance } from '../../helpers/core-utils' import { isTestInstance } from '../../helpers/core-utils'
import { removeVideoRedundancy } from '../redundancy'
export class VideosRedundancyScheduler extends AbstractScheduler { export class VideosRedundancyScheduler extends AbstractScheduler {
@ -30,7 +31,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
this.executing = true this.executing = true
for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
if (!isTestInstance()) logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
try { try {
const videoToDuplicate = await this.findVideoToDuplicate(obj) const videoToDuplicate = await this.findVideoToDuplicate(obj)
@ -39,20 +40,24 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
const videoFiles = videoToDuplicate.VideoFiles const videoFiles = videoToDuplicate.VideoFiles
videoFiles.forEach(f => f.Video = videoToDuplicate) videoFiles.forEach(f => f.Video = videoToDuplicate)
if (await this.isTooHeavy(obj.strategy, videoFiles, obj.size)) { await this.purgeCacheIfNeeded(obj, videoFiles)
if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
if (await this.isTooHeavy(obj, videoFiles)) {
logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
continue continue
} }
logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy)
await this.createVideoRedundancy(obj.strategy, videoFiles) await this.createVideoRedundancy(obj, videoFiles)
} catch (err) { } catch (err) {
logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) logger.error('Cannot run videos redundancy %s.', obj.strategy, { err })
} }
} }
await this.removeExpired() await this.extendsLocalExpiration()
await this.purgeRemoteExpired()
this.executing = false this.executing = false
} }
@ -61,16 +66,27 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
return this.instance || (this.instance = new this()) return this.instance || (this.instance = new this())
} }
private async removeExpired () { private async extendsLocalExpiration () {
const expired = await VideoRedundancyModel.listAllExpired() const expired = await VideoRedundancyModel.listLocalExpired()
for (const m of expired) {
logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m))
for (const redundancyModel of expired) {
try { try {
await m.destroy() const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
} catch (err) { } catch (err) {
logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel))
}
}
}
private async purgeRemoteExpired () {
const expired = await VideoRedundancyModel.listRemoteExpired()
for (const redundancyModel of expired) {
try {
await removeVideoRedundancy(redundancyModel)
} catch (err) {
logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
} }
} }
} }
@ -90,18 +106,14 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
} }
} }
private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
const serverActor = await getServerActor() const serverActor = await getServerActor()
for (const file of filesToDuplicate) { for (const file of filesToDuplicate) {
const existing = await VideoRedundancyModel.loadByFileId(file.id) const existing = await VideoRedundancyModel.loadByFileId(file.id)
if (existing) { if (existing) {
logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) await this.extendsExpirationOf(existing, redundancy.minLifetime)
existing.expiresOn = this.buildNewExpiration()
await existing.save()
await sendUpdateCacheFile(serverActor, existing)
continue continue
} }
@ -109,7 +121,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id)
if (!video) continue if (!video) continue
logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy)
const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() const { baseUrlHttp, baseUrlWs } = video.getBaseUrls()
const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs)
@ -120,10 +132,10 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
await rename(tmpPath, destPath) await rename(tmpPath, destPath)
const createdModel = await VideoRedundancyModel.create({ const createdModel = await VideoRedundancyModel.create({
expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), expiresOn: this.buildNewExpiration(redundancy.minLifetime),
url: getVideoCacheFileActivityPubUrl(file), url: getVideoCacheFileActivityPubUrl(file),
fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL),
strategy, strategy: redundancy.strategy,
videoFileId: file.id, videoFileId: file.id,
actorId: serverActor.id actorId: serverActor.id
}) })
@ -133,16 +145,36 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
} }
} }
private async isTooHeavy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[], maxSizeArg: number) { private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) {
const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) logger.info('Extending expiration of %s.', redundancy.url)
const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(strategy) const serverActor = await getServerActor()
redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
await redundancy.save()
await sendUpdateCacheFile(serverActor, redundancy)
}
private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
while (this.isTooHeavy(redundancy, filesToDuplicate)) {
const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime)
if (!toDelete) return
await removeVideoRedundancy(toDelete)
}
}
private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate)
const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy)
return totalDuplicated > maxSize return totalDuplicated > maxSize
} }
private buildNewExpiration () { private buildNewExpiration (expiresAfterMs: number) {
return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) return new Date(Date.now() + expiresAfterMs)
} }
private buildEntryLogId (object: VideoRedundancyModel) { private buildEntryLogId (object: VideoRedundancyModel) {

View File

@ -37,6 +37,7 @@ import { ServerModel } from '../server/server'
import { throwIfNotValid } from '../utils' import { throwIfNotValid } from '../utils'
import { VideoChannelModel } from '../video/video-channel' import { VideoChannelModel } from '../video/video-channel'
import { ActorFollowModel } from './actor-follow' import { ActorFollowModel } from './actor-follow'
import { VideoModel } from '../video/video'
enum ScopeNames { enum ScopeNames {
FULL = 'FULL' FULL = 'FULL'
@ -266,6 +267,36 @@ export class ActorModel extends Model<ActorModel> {
return ActorModel.unscoped().findById(id) return ActorModel.unscoped().findById(id)
} }
static loadAccountActorByVideoId (videoId: number, transaction: Sequelize.Transaction) {
const query = {
include: [
{
attributes: [ 'id' ],
model: AccountModel.unscoped(),
required: true,
include: [
{
attributes: [ 'id' ],
model: VideoChannelModel.unscoped(),
required: true,
include: {
attributes: [ 'id' ],
model: VideoModel.unscoped(),
required: true,
where: {
id: videoId
}
}
}
]
}
],
transaction
}
return ActorModel.unscoped().findOne(query as any) // FIXME: typings
}
static isActorUrlExist (url: string) { static isActorUrlExist (url: string) {
const query = { const query = {
raw: true, raw: true,

View File

@ -9,7 +9,6 @@ import {
Is, Is,
Model, Model,
Scopes, Scopes,
Sequelize,
Table, Table,
UpdatedAt UpdatedAt
} from 'sequelize-typescript' } from 'sequelize-typescript'
@ -28,6 +27,7 @@ import { ServerModel } from '../server/server'
import { sample } from 'lodash' import { sample } from 'lodash'
import { isTestInstance } from '../../helpers/core-utils' import { isTestInstance } from '../../helpers/core-utils'
import * as Bluebird from 'bluebird' import * as Bluebird from 'bluebird'
import * as Sequelize from 'sequelize'
export enum ScopeNames { export enum ScopeNames {
WITH_VIDEO = 'WITH_VIDEO' WITH_VIDEO = 'WITH_VIDEO'
@ -116,11 +116,11 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
Actor: ActorModel Actor: ActorModel
@AfterDestroy @AfterDestroy
static removeFilesAndSendDelete (instance: VideoRedundancyModel) { static removeFile (instance: VideoRedundancyModel) {
// Not us // Not us
if (!instance.strategy) return if (!instance.strategy) return
logger.info('Removing video file %s-.', instance.VideoFile.Video.uuid, instance.VideoFile.resolution) logger.info('Removing duplicated video file %s-%s.', instance.VideoFile.Video.uuid, instance.VideoFile.resolution)
return instance.VideoFile.Video.removeFile(instance.VideoFile) return instance.VideoFile.Video.removeFile(instance.VideoFile)
} }
@ -135,11 +135,12 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
return VideoRedundancyModel.scope(ScopeNames.WITH_VIDEO).findOne(query) return VideoRedundancyModel.scope(ScopeNames.WITH_VIDEO).findOne(query)
} }
static loadByUrl (url: string) { static loadByUrl (url: string, transaction?: Sequelize.Transaction) {
const query = { const query = {
where: { where: {
url url
} },
transaction
} }
return VideoRedundancyModel.findOne(query) return VideoRedundancyModel.findOne(query)
@ -157,7 +158,6 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
// On VideoModel! // On VideoModel!
const query = { const query = {
attributes: [ 'id', 'views' ], attributes: [ 'id', 'views' ],
logging: !isTestInstance(),
limit: randomizedFactor, limit: randomizedFactor,
order: getVideoSort('-views'), order: getVideoSort('-views'),
include: [ include: [
@ -174,7 +174,6 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
const query = { const query = {
attributes: [ 'id', 'views' ], attributes: [ 'id', 'views' ],
subQuery: false, subQuery: false,
logging: !isTestInstance(),
group: 'VideoModel.id', group: 'VideoModel.id',
limit: randomizedFactor, limit: randomizedFactor,
order: getVideoSort('-trending'), order: getVideoSort('-trending'),
@ -193,7 +192,6 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
// On VideoModel! // On VideoModel!
const query = { const query = {
attributes: [ 'id', 'publishedAt' ], attributes: [ 'id', 'publishedAt' ],
logging: !isTestInstance(),
limit: randomizedFactor, limit: randomizedFactor,
order: getVideoSort('-publishedAt'), order: getVideoSort('-publishedAt'),
where: { where: {
@ -210,11 +208,29 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
return VideoRedundancyModel.getVideoSample(VideoModel.unscoped().findAll(query)) return VideoRedundancyModel.getVideoSample(VideoModel.unscoped().findAll(query))
} }
static async loadOldestLocalThatAlreadyExpired (strategy: VideoRedundancyStrategy, expiresAfterMs: number) {
const expiredDate = new Date()
expiredDate.setMilliseconds(expiredDate.getMilliseconds() - expiresAfterMs)
const actor = await getServerActor()
const query = {
where: {
actorId: actor.id,
strategy,
createdAt: {
[ Sequelize.Op.lt ]: expiredDate
}
}
}
return VideoRedundancyModel.scope([ ScopeNames.WITH_VIDEO ]).findOne(query)
}
static async getTotalDuplicated (strategy: VideoRedundancyStrategy) { static async getTotalDuplicated (strategy: VideoRedundancyStrategy) {
const actor = await getServerActor() const actor = await getServerActor()
const options = { const options = {
logging: !isTestInstance(),
include: [ include: [
{ {
attributes: [], attributes: [],
@ -228,21 +244,39 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
] ]
} }
return VideoFileModel.sum('size', options) return VideoFileModel.sum('size', options as any) // FIXME: typings
} }
static listAllExpired () { static async listLocalExpired () {
const actor = await getServerActor()
const query = { const query = {
logging: !isTestInstance(),
where: { where: {
actorId: actor.id,
expiresOn: { expiresOn: {
[ Sequelize.Op.lt ]: new Date() [ Sequelize.Op.lt ]: new Date()
} }
} }
} }
return VideoRedundancyModel.scope(ScopeNames.WITH_VIDEO) return VideoRedundancyModel.scope([ ScopeNames.WITH_VIDEO ]).findAll(query)
.findAll(query) }
static async listRemoteExpired () {
const actor = await getServerActor()
const query = {
where: {
actorId: {
[Sequelize.Op.ne]: actor.id
},
expiresOn: {
[ Sequelize.Op.lt ]: new Date()
}
}
}
return VideoRedundancyModel.scope([ ScopeNames.WITH_VIDEO ]).findAll(query)
} }
static async getStats (strategy: VideoRedundancyStrategy) { static async getStats (strategy: VideoRedundancyStrategy) {
@ -299,7 +333,7 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
const notIn = Sequelize.literal( const notIn = Sequelize.literal(
'(' + '(' +
`SELECT "videoFileId" FROM "videoRedundancy" WHERE "actorId" = ${actor.id} AND "expiresOn" >= NOW()` + `SELECT "videoFileId" FROM "videoRedundancy" WHERE "actorId" = ${actor.id}` +
')' ')'
) )

View File

@ -106,4 +106,10 @@ export class VideoFileModel extends Model<VideoFileModel> {
return results.length === 1 return results.length === 1
}) })
} }
hasSameUniqueKeysThan (other: VideoFileModel) {
return this.fps === other.fps &&
this.resolution === other.resolution &&
this.videoId === other.videoId
}
} }

View File

@ -31,14 +31,13 @@ const expect = chai.expect
let servers: ServerInfo[] = [] let servers: ServerInfo[] = []
let video1Server2UUID: string let video1Server2UUID: string
let video2Server2UUID: string
function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: number } }, baseWebseeds: string[]) { function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: number } }, baseWebseeds: string[], server: ServerInfo) {
const parsed = magnetUtil.decode(file.magnetUri) const parsed = magnetUtil.decode(file.magnetUri)
for (const ws of baseWebseeds) { for (const ws of baseWebseeds) {
const found = parsed.urlList.find(url => url === `${ws}-${file.resolution.id}.mp4`) const found = parsed.urlList.find(url => url === `${ws}-${file.resolution.id}.mp4`)
expect(found, `Webseed ${ws} not found in ${file.magnetUri}`).to.not.be.undefined expect(found, `Webseed ${ws} not found in ${file.magnetUri} on server ${server.url}`).to.not.be.undefined
} }
} }
@ -49,6 +48,7 @@ async function runServers (strategy: VideoRedundancyStrategy, additionalParams:
check_interval: '5 seconds', check_interval: '5 seconds',
strategies: [ strategies: [
immutableAssign({ immutableAssign({
min_lifetime: '1 hour',
strategy: strategy, strategy: strategy,
size: '100KB' size: '100KB'
}, additionalParams) }, additionalParams)
@ -68,11 +68,6 @@ async function runServers (strategy: VideoRedundancyStrategy, additionalParams:
await viewVideo(servers[ 1 ].url, video1Server2UUID) await viewVideo(servers[ 1 ].url, video1Server2UUID)
} }
{
const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 2 server 2' })
video2Server2UUID = res.body.video.uuid
}
await waitJobs(servers) await waitJobs(servers)
// Server 1 and server 2 follow each other // Server 1 and server 2 follow each other
@ -85,36 +80,82 @@ async function runServers (strategy: VideoRedundancyStrategy, additionalParams:
await waitJobs(servers) await waitJobs(servers)
} }
async function check1WebSeed (strategy: VideoRedundancyStrategy) { async function check1WebSeed (strategy: VideoRedundancyStrategy, videoUUID?: string) {
if (!videoUUID) videoUUID = video1Server2UUID
const webseeds = [ const webseeds = [
'http://localhost:9002/static/webseed/' + video1Server2UUID 'http://localhost:9002/static/webseed/' + videoUUID
] ]
for (const server of servers) { for (const server of servers) {
{ {
const res = await getVideo(server.url, video1Server2UUID) const res = await getVideo(server.url, videoUUID)
const video: VideoDetails = res.body const video: VideoDetails = res.body
video.files.forEach(f => checkMagnetWebseeds(f, webseeds)) for (const f of video.files) {
} checkMagnetWebseeds(f, webseeds, server)
}
{
const res = await getStats(server.url)
const data: ServerStats = res.body
expect(data.videosRedundancy).to.have.lengthOf(1)
const stat = data.videosRedundancy[0]
expect(stat.strategy).to.equal(strategy)
expect(stat.totalSize).to.equal(102400)
expect(stat.totalUsed).to.equal(0)
expect(stat.totalVideoFiles).to.equal(0)
expect(stat.totalVideos).to.equal(0)
} }
} }
} }
async function enableRedundancy () { async function checkStatsWith2Webseed (strategy: VideoRedundancyStrategy) {
const res = await getStats(servers[0].url)
const data: ServerStats = res.body
expect(data.videosRedundancy).to.have.lengthOf(1)
const stat = data.videosRedundancy[0]
expect(stat.strategy).to.equal(strategy)
expect(stat.totalSize).to.equal(102400)
expect(stat.totalUsed).to.be.at.least(1).and.below(102401)
expect(stat.totalVideoFiles).to.equal(4)
expect(stat.totalVideos).to.equal(1)
}
async function checkStatsWith1Webseed (strategy: VideoRedundancyStrategy) {
const res = await getStats(servers[0].url)
const data: ServerStats = res.body
expect(data.videosRedundancy).to.have.lengthOf(1)
const stat = data.videosRedundancy[0]
expect(stat.strategy).to.equal(strategy)
expect(stat.totalSize).to.equal(102400)
expect(stat.totalUsed).to.equal(0)
expect(stat.totalVideoFiles).to.equal(0)
expect(stat.totalVideos).to.equal(0)
}
async function check2Webseeds (strategy: VideoRedundancyStrategy, videoUUID?: string) {
if (!videoUUID) videoUUID = video1Server2UUID
const webseeds = [
'http://localhost:9001/static/webseed/' + videoUUID,
'http://localhost:9002/static/webseed/' + videoUUID
]
for (const server of servers) {
{
const res = await getVideo(server.url, videoUUID)
const video: VideoDetails = res.body
for (const file of video.files) {
checkMagnetWebseeds(file, webseeds, server)
}
}
}
const files = await readdir(join(root(), 'test1', 'videos'))
expect(files).to.have.lengthOf(4)
for (const resolution of [ 240, 360, 480, 720 ]) {
expect(files.find(f => f === `${videoUUID}-${resolution}.mp4`)).to.not.be.undefined
}
}
async function enableRedundancyOnServer1 () {
await updateRedundancy(servers[ 0 ].url, servers[ 0 ].accessToken, servers[ 1 ].host, true) await updateRedundancy(servers[ 0 ].url, servers[ 0 ].accessToken, servers[ 1 ].host, true)
const res = await getFollowingListPaginationAndSort(servers[ 0 ].url, 0, 5, '-createdAt') const res = await getFollowingListPaginationAndSort(servers[ 0 ].url, 0, 5, '-createdAt')
@ -129,50 +170,6 @@ async function enableRedundancy () {
expect(server2.following.hostRedundancyAllowed).to.be.true expect(server2.following.hostRedundancyAllowed).to.be.true
} }
async function check2Webseeds (strategy: VideoRedundancyStrategy) {
await waitJobs(servers)
await wait(15000)
await waitJobs(servers)
const webseeds = [
'http://localhost:9001/static/webseed/' + video1Server2UUID,
'http://localhost:9002/static/webseed/' + video1Server2UUID
]
for (const server of servers) {
{
const res = await getVideo(server.url, video1Server2UUID)
const video: VideoDetails = res.body
for (const file of video.files) {
checkMagnetWebseeds(file, webseeds)
}
}
}
const files = await readdir(join(root(), 'test1', 'videos'))
expect(files).to.have.lengthOf(4)
for (const resolution of [ 240, 360, 480, 720 ]) {
expect(files.find(f => f === `${video1Server2UUID}-${resolution}.mp4`)).to.not.be.undefined
}
{
const res = await getStats(servers[0].url)
const data: ServerStats = res.body
expect(data.videosRedundancy).to.have.lengthOf(1)
const stat = data.videosRedundancy[0]
expect(stat.strategy).to.equal(strategy)
expect(stat.totalSize).to.equal(102400)
expect(stat.totalUsed).to.be.at.least(1).and.below(102401)
expect(stat.totalVideoFiles).to.equal(4)
expect(stat.totalVideos).to.equal(1)
}
}
async function cleanServers () { async function cleanServers () {
killallServers(servers) killallServers(servers)
} }
@ -188,18 +185,24 @@ describe('Test videos redundancy', function () {
return runServers(strategy) return runServers(strategy)
}) })
it('Should have 1 webseed on the first video', function () { it('Should have 1 webseed on the first video', async function () {
return check1WebSeed(strategy) await check1WebSeed(strategy)
await checkStatsWith1Webseed(strategy)
}) })
it('Should enable redundancy on server 1', function () { it('Should enable redundancy on server 1', function () {
return enableRedundancy() return enableRedundancyOnServer1()
}) })
it('Should have 2 webseed on the first video', function () { it('Should have 2 webseed on the first video', async function () {
this.timeout(40000) this.timeout(40000)
return check2Webseeds(strategy) await waitJobs(servers)
await wait(15000)
await waitJobs(servers)
await check2Webseeds(strategy)
await checkStatsWith2Webseed(strategy)
}) })
after(function () { after(function () {
@ -216,18 +219,24 @@ describe('Test videos redundancy', function () {
return runServers(strategy) return runServers(strategy)
}) })
it('Should have 1 webseed on the first video', function () { it('Should have 1 webseed on the first video', async function () {
return check1WebSeed(strategy) await check1WebSeed(strategy)
await checkStatsWith1Webseed(strategy)
}) })
it('Should enable redundancy on server 1', function () { it('Should enable redundancy on server 1', function () {
return enableRedundancy() return enableRedundancyOnServer1()
}) })
it('Should have 2 webseed on the first video', function () { it('Should have 2 webseed on the first video', async function () {
this.timeout(40000) this.timeout(40000)
return check2Webseeds(strategy) await waitJobs(servers)
await wait(15000)
await waitJobs(servers)
await check2Webseeds(strategy)
await checkStatsWith2Webseed(strategy)
}) })
after(function () { after(function () {
@ -241,15 +250,16 @@ describe('Test videos redundancy', function () {
before(function () { before(function () {
this.timeout(120000) this.timeout(120000)
return runServers(strategy, { minViews: 3 }) return runServers(strategy, { min_views: 3 })
}) })
it('Should have 1 webseed on the first video', function () { it('Should have 1 webseed on the first video', async function () {
return check1WebSeed(strategy) await check1WebSeed(strategy)
await checkStatsWith1Webseed(strategy)
}) })
it('Should enable redundancy on server 1', function () { it('Should enable redundancy on server 1', function () {
return enableRedundancy() return enableRedundancyOnServer1()
}) })
it('Should still have 1 webseed on the first video', async function () { it('Should still have 1 webseed on the first video', async function () {
@ -259,10 +269,11 @@ describe('Test videos redundancy', function () {
await wait(15000) await wait(15000)
await waitJobs(servers) await waitJobs(servers)
return check1WebSeed(strategy) await check1WebSeed(strategy)
await checkStatsWith1Webseed(strategy)
}) })
it('Should view 2 times the first video', async function () { it('Should view 2 times the first video to have > min_views config', async function () {
this.timeout(40000) this.timeout(40000)
await viewVideo(servers[ 0 ].url, video1Server2UUID) await viewVideo(servers[ 0 ].url, video1Server2UUID)
@ -272,10 +283,117 @@ describe('Test videos redundancy', function () {
await waitJobs(servers) await waitJobs(servers)
}) })
it('Should have 2 webseed on the first video', function () { it('Should have 2 webseed on the first video', async function () {
this.timeout(40000) this.timeout(40000)
return check2Webseeds(strategy) await waitJobs(servers)
await wait(15000)
await waitJobs(servers)
await check2Webseeds(strategy)
await checkStatsWith2Webseed(strategy)
})
after(function () {
return cleanServers()
})
})
describe('Test expiration', function () {
const strategy = 'recently-added'
async function checkContains (servers: ServerInfo[], str: string) {
for (const server of servers) {
const res = await getVideo(server.url, video1Server2UUID)
const video: VideoDetails = res.body
for (const f of video.files) {
expect(f.magnetUri).to.contain(str)
}
}
}
async function checkNotContains (servers: ServerInfo[], str: string) {
for (const server of servers) {
const res = await getVideo(server.url, video1Server2UUID)
const video: VideoDetails = res.body
for (const f of video.files) {
expect(f.magnetUri).to.not.contain(str)
}
}
}
before(async function () {
this.timeout(120000)
await runServers(strategy, { min_lifetime: '7 seconds', min_views: 0 })
await enableRedundancyOnServer1()
})
it('Should still have 2 webseeds after 10 seconds', async function () {
this.timeout(40000)
await wait(10000)
try {
await checkContains(servers, 'http%3A%2F%2Flocalhost%3A9001')
} catch {
// Maybe a server deleted a redundancy in the scheduler
await wait(2000)
await checkContains(servers, 'http%3A%2F%2Flocalhost%3A9001')
}
})
it('Should stop server 1 and expire video redundancy', async function () {
this.timeout(40000)
killallServers([ servers[0] ])
await wait(10000)
await checkNotContains([ servers[1], servers[2] ], 'http%3A%2F%2Flocalhost%3A9001')
})
after(function () {
return killallServers([ servers[1], servers[2] ])
})
})
describe('Test file replacement', function () {
let video2Server2UUID: string
const strategy = 'recently-added'
before(async function () {
this.timeout(120000)
await runServers(strategy, { min_lifetime: '7 seconds', min_views: 0 })
await enableRedundancyOnServer1()
await waitJobs(servers)
await wait(5000)
await waitJobs(servers)
await check2Webseeds(strategy)
await checkStatsWith2Webseed(strategy)
const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 2 server 2' })
video2Server2UUID = res.body.video.uuid
})
it('Should cache video 2 webseed on the first video', async function () {
this.timeout(40000)
this.retries(3)
await waitJobs(servers)
await wait(7000)
await check1WebSeed(strategy, video1Server2UUID)
await check2Webseeds(strategy, video2Server2UUID)
}) })
after(function () { after(function () {

View File

@ -144,8 +144,8 @@ function runServer (serverNumber: number, configOverride?: Object) {
}) })
} }
async function reRunServer (server: ServerInfo) { async function reRunServer (server: ServerInfo, configOverride?: any) {
const newServer = await runServer(server.serverNumber) const newServer = await runServer(server.serverNumber, configOverride)
server.app = newServer.app server.app = newServer.app
return server return server

View File

@ -3,17 +3,20 @@ export type VideoRedundancyStrategy = 'most-views' | 'trending' | 'recently-adde
export type MostViewsRedundancyStrategy = { export type MostViewsRedundancyStrategy = {
strategy: 'most-views' strategy: 'most-views'
size: number size: number
minLifetime: number
} }
export type TrendingRedundancyStrategy = { export type TrendingRedundancyStrategy = {
strategy: 'trending' strategy: 'trending'
size: number size: number
minLifetime: number
} }
export type RecentlyAddedStrategy = { export type RecentlyAddedStrategy = {
strategy: 'recently-added' strategy: 'recently-added'
size: number size: number
minViews: number minViews: number
minLifetime: number
} }
export type VideosRedundancy = MostViewsRedundancyStrategy | TrendingRedundancyStrategy | RecentlyAddedStrategy export type VideosRedundancy = MostViewsRedundancyStrategy | TrendingRedundancyStrategy | RecentlyAddedStrategy

46
support/doc/redundancy.md Normal file
View File

@ -0,0 +1,46 @@
# Redundancy
A PeerTube instance can cache other PeerTube videos to improve bandwidth of popular videos or small instances.
## How it works
The instance administrator can choose between multiple redundancy strategies (cache trending videos or recently uploaded videos etc), set their maximum size and the minimum duplication lifetime.
Then, they choose the instances they want to cache in `Manage follows -> Following` admin table.
Videos are kept in the cache for at least `min_lifetime`, and then evicted when the cache is full.
When PeerTube chooses a video to duplicate, it imports all the resolution files (to avoid consistency issues) using their magnet URI and put them in the `storage.videos` directory.
Then it sends a `Create -> CacheFile` ActivityPub message to other federated instances. This new instance is injected as [WebSeed](https://github.com/Chocobozzz/PeerTube/blob/develop/FAQ.md#what-is-webseed) in the magnet URI by instances that received this ActivityPub message.
## Stats
See the `/api/v1/server/stats` endpoint. For example:
```
{
...
"videosRedundancy": [
{
"totalUsed": 0,
"totalVideos": 0,
"totalVideoFiles": 0,
"strategy": "most-views",
"totalSize": 104857600
},
{
"totalUsed": 0,
"totalVideos": 0,
"totalVideoFiles": 0,
"strategy": "trending",
"totalSize": 104857600
},
{
"totalUsed": 0,
"totalVideos": 0,
"totalVideoFiles": 0,
"strategy": "recently-added",
"totalSize": 104857600
}
]
}
```