Add trending videos strategy
This commit is contained in:
parent
6f0c46be8c
commit
b36f41ca09
|
@ -74,6 +74,9 @@ redundancy:
|
|||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'most-views' # Cache videos that have the most views
|
||||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'trending' # Cache trending videos
|
||||
|
||||
cache:
|
||||
previews:
|
||||
|
|
|
@ -75,6 +75,9 @@ redundancy:
|
|||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'most-views' # Cache videos that have the most views
|
||||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'trending' # Cache trending videos
|
||||
|
||||
###############################################################################
|
||||
#
|
||||
|
|
|
@ -26,6 +26,9 @@ redundancy:
|
|||
-
|
||||
size: '100KB'
|
||||
strategy: 'most-views'
|
||||
-
|
||||
size: '100KB'
|
||||
strategy: 'trending'
|
||||
|
||||
cache:
|
||||
previews:
|
||||
|
|
|
@ -6,9 +6,8 @@ for i in $(seq 1 6); do
|
|||
dbname="peertube_test$i"
|
||||
|
||||
dropdb --if-exists "$dbname"
|
||||
rm -rf "./test$i"
|
||||
rm -f "./config/local-test.json"
|
||||
rm -f "./config/local-test-$i.json"
|
||||
rm -rf "./test$i" "./config/local-test.json" "./config/local-test-$i.json"
|
||||
|
||||
createdb -O peertube "$dbname"
|
||||
psql -c "CREATE EXTENSION pg_trgm;" "$dbname"
|
||||
psql -c "CREATE EXTENSION unaccent;" "$dbname"
|
||||
|
|
|
@ -41,7 +41,7 @@ function checkConfig () {
|
|||
const redundancyVideos = config.get<VideosRedundancy[]>('redundancy.videos')
|
||||
if (isArray(redundancyVideos)) {
|
||||
for (const r of redundancyVideos) {
|
||||
if ([ 'most-views' ].indexOf(r.strategy) === -1) {
|
||||
if ([ 'most-views', 'trending' ].indexOf(r.strategy) === -1) {
|
||||
return 'Redundancy video entries should have "most-views" strategy instead of ' + r.strategy
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,6 +75,8 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
|
|||
|
||||
private findVideoToDuplicate (strategy: VideoRedundancyStrategy) {
|
||||
if (strategy === 'most-views') return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
|
||||
|
||||
if (strategy === 'trending') return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
|
||||
}
|
||||
|
||||
private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) {
|
||||
|
|
|
@ -14,11 +14,10 @@ import {
|
|||
UpdatedAt
|
||||
} from 'sequelize-typescript'
|
||||
import { ActorModel } from '../activitypub/actor'
|
||||
import { throwIfNotValid } from '../utils'
|
||||
import { getVideoSort, throwIfNotValid } from '../utils'
|
||||
import { isActivityPubUrlValid, isUrlValid } from '../../helpers/custom-validators/activitypub/misc'
|
||||
import { CONSTRAINTS_FIELDS, VIDEO_EXT_MIMETYPE } from '../../initializers'
|
||||
import { CONFIG, CONSTRAINTS_FIELDS, VIDEO_EXT_MIMETYPE } from '../../initializers'
|
||||
import { VideoFileModel } from '../video/video-file'
|
||||
import { isDateValid } from '../../helpers/custom-validators/misc'
|
||||
import { getServerActor } from '../../helpers/utils'
|
||||
import { VideoModel } from '../video/video'
|
||||
import { VideoRedundancyStrategy } from '../../../shared/models/redundancy'
|
||||
|
@ -145,50 +144,51 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
|
|||
return VideoRedundancyModel.findOne(query)
|
||||
}
|
||||
|
||||
static getVideoSample (rows: { id: number }[]) {
|
||||
const ids = rows.map(r => r.id)
|
||||
const id = sample(ids)
|
||||
|
||||
return VideoModel.loadWithFile(id, undefined, !isTestInstance())
|
||||
}
|
||||
|
||||
static async findMostViewToDuplicate (randomizedFactor: number) {
|
||||
// On VideoModel!
|
||||
const query = {
|
||||
attributes: [ 'id', 'views' ],
|
||||
logging: !isTestInstance(),
|
||||
limit: randomizedFactor,
|
||||
order: [ [ 'views', 'DESC' ] ],
|
||||
order: getVideoSort('-views'),
|
||||
include: [
|
||||
{
|
||||
model: VideoFileModel.unscoped(),
|
||||
required: true,
|
||||
where: {
|
||||
id: {
|
||||
[ Sequelize.Op.notIn ]: await VideoRedundancyModel.buildExcludeIn()
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
attributes: [],
|
||||
model: VideoChannelModel.unscoped(),
|
||||
required: true,
|
||||
include: [
|
||||
{
|
||||
attributes: [],
|
||||
model: ActorModel.unscoped(),
|
||||
required: true,
|
||||
include: [
|
||||
{
|
||||
attributes: [],
|
||||
model: ServerModel.unscoped(),
|
||||
required: true,
|
||||
where: {
|
||||
redundancyAllowed: true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
await VideoRedundancyModel.buildVideoFileForDuplication(),
|
||||
VideoRedundancyModel.buildServerRedundancyInclude()
|
||||
]
|
||||
}
|
||||
|
||||
const rows = await VideoModel.unscoped().findAll(query)
|
||||
|
||||
return sample(rows)
|
||||
return VideoRedundancyModel.getVideoSample(rows as { id: number }[])
|
||||
}
|
||||
|
||||
static async findTrendingToDuplicate (randomizedFactor: number) {
|
||||
// On VideoModel!
|
||||
const query = {
|
||||
attributes: [ 'id', 'views' ],
|
||||
subQuery: false,
|
||||
logging: !isTestInstance(),
|
||||
group: 'VideoModel.id',
|
||||
limit: randomizedFactor,
|
||||
order: getVideoSort('-trending'),
|
||||
include: [
|
||||
await VideoRedundancyModel.buildVideoFileForDuplication(),
|
||||
VideoRedundancyModel.buildServerRedundancyInclude(),
|
||||
|
||||
VideoModel.buildTrendingQuery(CONFIG.TRENDING.VIDEOS.INTERVAL_DAYS)
|
||||
]
|
||||
}
|
||||
|
||||
const rows = await VideoModel.unscoped().findAll(query)
|
||||
|
||||
return VideoRedundancyModel.getVideoSample(rows as { id: number }[])
|
||||
}
|
||||
|
||||
static async getVideoFiles (strategy: VideoRedundancyStrategy) {
|
||||
|
@ -211,7 +211,7 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
|
|||
logging: !isTestInstance(),
|
||||
where: {
|
||||
expiresOn: {
|
||||
[Sequelize.Op.lt]: new Date()
|
||||
[ Sequelize.Op.lt ]: new Date()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -237,13 +237,50 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
|
|||
}
|
||||
}
|
||||
|
||||
private static async buildExcludeIn () {
|
||||
// Don't include video files we already duplicated
|
||||
private static async buildVideoFileForDuplication () {
|
||||
const actor = await getServerActor()
|
||||
|
||||
return Sequelize.literal(
|
||||
const notIn = Sequelize.literal(
|
||||
'(' +
|
||||
`SELECT "videoFileId" FROM "videoRedundancy" WHERE "actorId" = ${actor.id} AND "expiresOn" >= NOW()` +
|
||||
')'
|
||||
)
|
||||
|
||||
return {
|
||||
attributes: [],
|
||||
model: VideoFileModel.unscoped(),
|
||||
required: true,
|
||||
where: {
|
||||
id: {
|
||||
[ Sequelize.Op.notIn ]: notIn
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static buildServerRedundancyInclude () {
|
||||
return {
|
||||
attributes: [],
|
||||
model: VideoChannelModel.unscoped(),
|
||||
required: true,
|
||||
include: [
|
||||
{
|
||||
attributes: [],
|
||||
model: ActorModel.unscoped(),
|
||||
required: true,
|
||||
include: [
|
||||
{
|
||||
attributes: [],
|
||||
model: ServerModel.unscoped(),
|
||||
required: true,
|
||||
where: {
|
||||
redundancyAllowed: true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -387,16 +387,7 @@ type AvailableForListIDsOptions = {
|
|||
}
|
||||
|
||||
if (options.trendingDays) {
|
||||
query.include.push({
|
||||
attributes: [],
|
||||
model: VideoViewModel,
|
||||
required: false,
|
||||
where: {
|
||||
startDate: {
|
||||
[ Sequelize.Op.gte ]: new Date(new Date().getTime() - (24 * 3600 * 1000) * options.trendingDays)
|
||||
}
|
||||
}
|
||||
})
|
||||
query.include.push(VideoModel.buildTrendingQuery(options.trendingDays))
|
||||
|
||||
query.subQuery = false
|
||||
}
|
||||
|
@ -1071,9 +1062,12 @@ export class VideoModel extends Model<VideoModel> {
|
|||
}
|
||||
|
||||
static load (id: number, t?: Sequelize.Transaction) {
|
||||
const options = t ? { transaction: t } : undefined
|
||||
return VideoModel.findById(id, { transaction: t })
|
||||
}
|
||||
|
||||
return VideoModel.findById(id, options)
|
||||
static loadWithFile (id: number, t?: Sequelize.Transaction, logging?: boolean) {
|
||||
return VideoModel.scope(ScopeNames.WITH_FILES)
|
||||
.findById(id, { transaction: t, logging })
|
||||
}
|
||||
|
||||
static loadByUrlAndPopulateAccount (url: string, t?: Sequelize.Transaction) {
|
||||
|
@ -1191,6 +1185,20 @@ export class VideoModel extends Model<VideoModel> {
|
|||
.then(rows => rows.map(r => r[ field ]))
|
||||
}
|
||||
|
||||
static buildTrendingQuery (trendingDays: number) {
|
||||
return {
|
||||
attributes: [],
|
||||
subQuery: false,
|
||||
model: VideoViewModel,
|
||||
required: false,
|
||||
where: {
|
||||
startDate: {
|
||||
[ Sequelize.Op.gte ]: new Date(new Date().getTime() - (24 * 3600 * 1000) * trendingDays)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static buildActorWhereWithFilter (filter?: VideoFilter) {
|
||||
if (filter && filter === 'local') {
|
||||
return {
|
||||
|
|
|
@ -22,9 +22,14 @@ import { updateRedundancy } from '../../utils/server/redundancy'
|
|||
import { ActorFollow } from '../../../../shared/models/actors'
|
||||
import { readdir } from 'fs-extra'
|
||||
import { join } from 'path'
|
||||
import { VideoRedundancyStrategy } from '../../../../shared/models/redundancy'
|
||||
|
||||
const expect = chai.expect
|
||||
|
||||
let servers: ServerInfo[] = []
|
||||
let video1Server2UUID: string
|
||||
let video2Server2UUID: string
|
||||
|
||||
function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: number } }, baseWebseeds: string[]) {
|
||||
const parsed = magnetUtil.decode(file.magnetUri)
|
||||
|
||||
|
@ -34,107 +39,159 @@ function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: numbe
|
|||
}
|
||||
}
|
||||
|
||||
async function runServers (strategy: VideoRedundancyStrategy) {
|
||||
const config = {
|
||||
redundancy: {
|
||||
videos: [
|
||||
{
|
||||
strategy: strategy,
|
||||
size: '100KB'
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
servers = await flushAndRunMultipleServers(3, config)
|
||||
|
||||
// Get the access tokens
|
||||
await setAccessTokensToServers(servers)
|
||||
|
||||
{
|
||||
const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 1 server 2' })
|
||||
video1Server2UUID = res.body.video.uuid
|
||||
|
||||
await viewVideo(servers[ 1 ].url, video1Server2UUID)
|
||||
}
|
||||
|
||||
{
|
||||
const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 2 server 2' })
|
||||
video2Server2UUID = res.body.video.uuid
|
||||
}
|
||||
|
||||
await waitJobs(servers)
|
||||
|
||||
// Server 1 and server 2 follow each other
|
||||
await doubleFollow(servers[ 0 ], servers[ 1 ])
|
||||
// Server 1 and server 3 follow each other
|
||||
await doubleFollow(servers[ 0 ], servers[ 2 ])
|
||||
// Server 2 and server 3 follow each other
|
||||
await doubleFollow(servers[ 1 ], servers[ 2 ])
|
||||
|
||||
await waitJobs(servers)
|
||||
}
|
||||
|
||||
async function check1WebSeed () {
|
||||
const webseeds = [
|
||||
'http://localhost:9002/static/webseed/' + video1Server2UUID
|
||||
]
|
||||
|
||||
for (const server of servers) {
|
||||
const res = await getVideo(server.url, video1Server2UUID)
|
||||
|
||||
const video: VideoDetails = res.body
|
||||
video.files.forEach(f => checkMagnetWebseeds(f, webseeds))
|
||||
}
|
||||
}
|
||||
|
||||
async function enableRedundancy () {
|
||||
await updateRedundancy(servers[ 0 ].url, servers[ 0 ].accessToken, servers[ 1 ].host, true)
|
||||
|
||||
const res = await getFollowingListPaginationAndSort(servers[ 0 ].url, 0, 5, '-createdAt')
|
||||
const follows: ActorFollow[] = res.body.data
|
||||
const server2 = follows.find(f => f.following.host === 'localhost:9002')
|
||||
const server3 = follows.find(f => f.following.host === 'localhost:9003')
|
||||
|
||||
expect(server3).to.not.be.undefined
|
||||
expect(server3.following.hostRedundancyAllowed).to.be.false
|
||||
|
||||
expect(server2).to.not.be.undefined
|
||||
expect(server2.following.hostRedundancyAllowed).to.be.true
|
||||
}
|
||||
|
||||
async function check2Webseeds () {
|
||||
await waitJobs(servers)
|
||||
await wait(15000)
|
||||
await waitJobs(servers)
|
||||
|
||||
const webseeds = [
|
||||
'http://localhost:9001/static/webseed/' + video1Server2UUID,
|
||||
'http://localhost:9002/static/webseed/' + video1Server2UUID
|
||||
]
|
||||
|
||||
for (const server of servers) {
|
||||
const res = await getVideo(server.url, video1Server2UUID)
|
||||
|
||||
const video: VideoDetails = res.body
|
||||
|
||||
for (const file of video.files) {
|
||||
checkMagnetWebseeds(file, webseeds)
|
||||
}
|
||||
}
|
||||
|
||||
const files = await readdir(join(root(), 'test1', 'videos'))
|
||||
expect(files).to.have.lengthOf(4)
|
||||
|
||||
for (const resolution of [ 240, 360, 480, 720 ]) {
|
||||
expect(files.find(f => f === `${video1Server2UUID}-${resolution}.mp4`)).to.not.be.undefined
|
||||
}
|
||||
}
|
||||
|
||||
async function cleanServers () {
|
||||
killallServers(servers)
|
||||
}
|
||||
|
||||
describe('Test videos redundancy', function () {
|
||||
let servers: ServerInfo[] = []
|
||||
let video1Server2UUID: string
|
||||
let video2Server2UUID: string
|
||||
|
||||
before(async function () {
|
||||
this.timeout(120000)
|
||||
describe('With most-views strategy', function () {
|
||||
|
||||
servers = await flushAndRunMultipleServers(3)
|
||||
before(function () {
|
||||
this.timeout(120000)
|
||||
|
||||
// Get the access tokens
|
||||
await setAccessTokensToServers(servers)
|
||||
return runServers('most-views')
|
||||
})
|
||||
|
||||
{
|
||||
const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 1 server 2' })
|
||||
video1Server2UUID = res.body.video.uuid
|
||||
it('Should have 1 webseed on the first video', function () {
|
||||
return check1WebSeed()
|
||||
})
|
||||
|
||||
await viewVideo(servers[1].url, video1Server2UUID)
|
||||
}
|
||||
it('Should enable redundancy on server 1', async function () {
|
||||
return enableRedundancy()
|
||||
})
|
||||
|
||||
{
|
||||
const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 2 server 2' })
|
||||
video2Server2UUID = res.body.video.uuid
|
||||
}
|
||||
it('Should have 2 webseed on the first video', async function () {
|
||||
this.timeout(40000)
|
||||
|
||||
await waitJobs(servers)
|
||||
return check2Webseeds()
|
||||
})
|
||||
|
||||
// Server 1 and server 2 follow each other
|
||||
await doubleFollow(servers[0], servers[1])
|
||||
// Server 1 and server 3 follow each other
|
||||
await doubleFollow(servers[0], servers[2])
|
||||
// Server 2 and server 3 follow each other
|
||||
await doubleFollow(servers[1], servers[2])
|
||||
|
||||
await waitJobs(servers)
|
||||
after(function () {
|
||||
return cleanServers()
|
||||
})
|
||||
})
|
||||
|
||||
it('Should have 1 webseed on the first video', async function () {
|
||||
const webseeds = [
|
||||
'http://localhost:9002/static/webseed/' + video1Server2UUID
|
||||
]
|
||||
describe('With trending strategy', function () {
|
||||
|
||||
for (const server of servers) {
|
||||
const res = await getVideo(server.url, video1Server2UUID)
|
||||
before(function () {
|
||||
this.timeout(120000)
|
||||
|
||||
const video: VideoDetails = res.body
|
||||
video.files.forEach(f => checkMagnetWebseeds(f, webseeds))
|
||||
}
|
||||
})
|
||||
return runServers('trending')
|
||||
})
|
||||
|
||||
it('Should enable redundancy on server 1', async function () {
|
||||
await updateRedundancy(servers[0].url, servers[0].accessToken, servers[1].host, true)
|
||||
it('Should have 1 webseed on the first video', function () {
|
||||
return check1WebSeed()
|
||||
})
|
||||
|
||||
const res = await getFollowingListPaginationAndSort(servers[0].url, 0, 5, '-createdAt')
|
||||
const follows: ActorFollow[] = res.body.data
|
||||
const server2 = follows.find(f => f.following.host === 'localhost:9002')
|
||||
const server3 = follows.find(f => f.following.host === 'localhost:9003')
|
||||
it('Should enable redundancy on server 1', async function () {
|
||||
return enableRedundancy()
|
||||
})
|
||||
|
||||
expect(server3).to.not.be.undefined
|
||||
expect(server3.following.hostRedundancyAllowed).to.be.false
|
||||
it('Should have 2 webseed on the first video', async function () {
|
||||
this.timeout(40000)
|
||||
|
||||
expect(server2).to.not.be.undefined
|
||||
expect(server2.following.hostRedundancyAllowed).to.be.true
|
||||
})
|
||||
return check2Webseeds()
|
||||
})
|
||||
|
||||
it('Should have 2 webseed on the first video', async function () {
|
||||
this.timeout(40000)
|
||||
|
||||
await waitJobs(servers)
|
||||
await wait(15000)
|
||||
await waitJobs(servers)
|
||||
|
||||
const webseeds = [
|
||||
'http://localhost:9001/static/webseed/' + video1Server2UUID,
|
||||
'http://localhost:9002/static/webseed/' + video1Server2UUID
|
||||
]
|
||||
|
||||
for (const server of servers) {
|
||||
const res = await getVideo(server.url, video1Server2UUID)
|
||||
|
||||
const video: VideoDetails = res.body
|
||||
|
||||
for (const file of video.files) {
|
||||
checkMagnetWebseeds(file, webseeds)
|
||||
}
|
||||
}
|
||||
|
||||
const files = await readdir(join(root(), 'test1', 'videos'))
|
||||
expect(files).to.have.lengthOf(4)
|
||||
|
||||
for (const resolution of [ 240, 360, 480, 720 ]) {
|
||||
expect(files.find(f => f === `${video1Server2UUID}-${resolution}.mp4`)).to.not.be.undefined
|
||||
}
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
killallServers(servers)
|
||||
|
||||
// Keep the logs if the test failed
|
||||
if (this['ok']) {
|
||||
await flushTests()
|
||||
}
|
||||
after(function () {
|
||||
return cleanServers()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -35,7 +35,7 @@ interface ServerInfo {
|
|||
}
|
||||
}
|
||||
|
||||
function flushAndRunMultipleServers (totalServers) {
|
||||
function flushAndRunMultipleServers (totalServers: number, configOverride?: Object) {
|
||||
let apps = []
|
||||
let i = 0
|
||||
|
||||
|
@ -53,7 +53,7 @@ function flushAndRunMultipleServers (totalServers) {
|
|||
for (let j = 1; j <= totalServers; j++) {
|
||||
// For the virtual buffer
|
||||
setTimeout(() => {
|
||||
runServer(j).then(app => anotherServerDone(j, app))
|
||||
runServer(j, configOverride).then(app => anotherServerDone(j, app))
|
||||
}, 1000 * (j - 1))
|
||||
}
|
||||
})
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
export type VideoRedundancyStrategy = 'most-views'
|
||||
export type VideoRedundancyStrategy = 'most-views' | 'trending'
|
||||
|
||||
export interface VideosRedundancy {
|
||||
strategy: VideoRedundancyStrategy
|
||||
|
|
Loading…
Reference in New Issue