parent
eba9528391
commit
dbdc20e673
|
@ -3,6 +3,7 @@
|
||||||
import { expect } from 'chai'
|
import { expect } from 'chai'
|
||||||
import { prepareViewsServers, prepareViewsVideos, processViewersStats } from '@tests/shared/views.js'
|
import { prepareViewsServers, prepareViewsVideos, processViewersStats } from '@tests/shared/views.js'
|
||||||
import { cleanupTests, PeerTubeServer } from '@peertube/peertube-server-commands'
|
import { cleanupTests, PeerTubeServer } from '@peertube/peertube-server-commands'
|
||||||
|
import { wait } from '@peertube/peertube-core-utils'
|
||||||
|
|
||||||
describe('Test views retention stats', function () {
|
describe('Test views retention stats', function () {
|
||||||
let servers: PeerTubeServer[]
|
let servers: PeerTubeServer[]
|
||||||
|
@ -45,6 +46,28 @@ describe('Test views retention stats', function () {
|
||||||
|
|
||||||
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 75, 25, 25, 25, 0 ])
|
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 75, 25, 25, 25, 0 ])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('Should display appropriate retention metrics after a server restart', async function () {
|
||||||
|
this.timeout(240000)
|
||||||
|
|
||||||
|
const newVideo = await servers[0].videos.quickUpload({ name: 'video 2' })
|
||||||
|
|
||||||
|
await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.2,127.0.0.1', id: newVideo.id, currentTimes: [ 0, 1 ] })
|
||||||
|
await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.3,127.0.0.1', id: newVideo.id, currentTimes: [ 1, 3 ] })
|
||||||
|
|
||||||
|
await wait(2500)
|
||||||
|
|
||||||
|
await servers[0].kill()
|
||||||
|
|
||||||
|
await servers[0].run()
|
||||||
|
|
||||||
|
await processViewersStats(servers)
|
||||||
|
|
||||||
|
const { data } = await servers[0].videoStats.getRetentionStats({ videoId: newVideo.id })
|
||||||
|
expect(data).to.have.lengthOf(6)
|
||||||
|
|
||||||
|
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 100, 50, 50, 0, 0 ])
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
after(async function () {
|
after(async function () {
|
||||||
|
|
|
@ -480,6 +480,7 @@ const VIEW_LIFETIME = {
|
||||||
VIEWER_COUNTER: 60000 * 2, // 2 minutes
|
VIEWER_COUNTER: 60000 * 2, // 2 minutes
|
||||||
VIEWER_STATS: 60000 * 60 // 1 hour
|
VIEWER_STATS: 60000 * 60 // 1 hour
|
||||||
}
|
}
|
||||||
|
let VIEWER_SYNC_REDIS = 30000 // Sync viewer into redis
|
||||||
|
|
||||||
const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100
|
const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100
|
||||||
|
|
||||||
|
@ -1102,6 +1103,8 @@ if (process.env.PRODUCTION_CONSTANTS !== 'true') {
|
||||||
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
|
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
|
||||||
|
|
||||||
JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000
|
JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000
|
||||||
|
|
||||||
|
VIEWER_SYNC_REDIS = 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isTestInstance()) {
|
if (isTestInstance()) {
|
||||||
|
@ -1202,6 +1205,7 @@ export {
|
||||||
DEFAULT_THEME_NAME,
|
DEFAULT_THEME_NAME,
|
||||||
NSFW_POLICY_TYPES,
|
NSFW_POLICY_TYPES,
|
||||||
STATIC_MAX_AGE,
|
STATIC_MAX_AGE,
|
||||||
|
VIEWER_SYNC_REDIS,
|
||||||
STATIC_PATHS,
|
STATIC_PATHS,
|
||||||
VIDEO_IMPORT_TIMEOUT,
|
VIDEO_IMPORT_TIMEOUT,
|
||||||
VIDEO_PLAYLIST_TYPES,
|
VIDEO_PLAYLIST_TYPES,
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { VideoViewEvent } from '@peertube/peertube-models'
|
||||||
import { isTestOrDevInstance } from '@peertube/peertube-node-utils'
|
import { isTestOrDevInstance } from '@peertube/peertube-node-utils'
|
||||||
import { GeoIP } from '@server/helpers/geo-ip.js'
|
import { GeoIP } from '@server/helpers/geo-ip.js'
|
||||||
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
|
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
|
||||||
import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants.js'
|
import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEWER_SYNC_REDIS, VIEW_LIFETIME } from '@server/initializers/constants.js'
|
||||||
import { sequelizeTypescript } from '@server/initializers/database.js'
|
import { sequelizeTypescript } from '@server/initializers/database.js'
|
||||||
import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js'
|
import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js'
|
||||||
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js'
|
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js'
|
||||||
|
@ -33,11 +33,14 @@ type LocalViewerStats = {
|
||||||
|
|
||||||
export class VideoViewerStats {
|
export class VideoViewerStats {
|
||||||
private processingViewersStats = false
|
private processingViewersStats = false
|
||||||
|
private processingRedisWrites = false
|
||||||
|
|
||||||
private readonly viewerCache = new Map<string, LocalViewerStats>()
|
private readonly viewerCache = new Map<string, LocalViewerStats>()
|
||||||
|
private readonly redisPendingWrites = new Map<string, { ip: string, videoId: number, stats: LocalViewerStats }>()
|
||||||
|
|
||||||
constructor () {
|
constructor () {
|
||||||
setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
|
setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
|
||||||
|
setInterval(() => this.syncRedisWrites(), VIEWER_SYNC_REDIS)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
@ -123,7 +126,7 @@ export class VideoViewerStats {
|
||||||
|
|
||||||
logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
|
logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
|
||||||
|
|
||||||
await this.setLocalVideoViewer(ip, video.id, stats)
|
this.setLocalVideoViewer(ip, video.id, stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
async processViewerStats () {
|
async processViewerStats () {
|
||||||
|
@ -135,6 +138,8 @@ export class VideoViewerStats {
|
||||||
const now = new Date().getTime()
|
const now = new Date().getTime()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
await this.syncRedisWrites()
|
||||||
|
|
||||||
const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
|
const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
|
||||||
|
|
||||||
for (const key of allKeys) {
|
for (const key of allKeys) {
|
||||||
|
@ -222,7 +227,7 @@ export class VideoViewerStats {
|
||||||
const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(ip, videoId)
|
const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(ip, videoId)
|
||||||
this.viewerCache.set(viewerKey, stats)
|
this.viewerCache.set(viewerKey, stats)
|
||||||
|
|
||||||
return Redis.Instance.setLocalVideoViewer(ip, videoId, stats)
|
this.redisPendingWrites.set(viewerKey, { ip, videoId, stats })
|
||||||
}
|
}
|
||||||
|
|
||||||
private deleteLocalVideoViewersKeys (key: string) {
|
private deleteLocalVideoViewersKeys (key: string) {
|
||||||
|
@ -230,4 +235,23 @@ export class VideoViewerStats {
|
||||||
|
|
||||||
return Redis.Instance.deleteLocalVideoViewersKeys(key)
|
return Redis.Instance.deleteLocalVideoViewersKeys(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async syncRedisWrites () {
|
||||||
|
if (this.processingRedisWrites) return
|
||||||
|
|
||||||
|
this.processingRedisWrites = true
|
||||||
|
|
||||||
|
for (const [ key, pendingWrite ] of this.redisPendingWrites) {
|
||||||
|
const { ip, videoId, stats } = pendingWrite
|
||||||
|
this.redisPendingWrites.delete(key)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await Redis.Instance.setLocalVideoViewer(ip, videoId, stats)
|
||||||
|
} catch (err) {
|
||||||
|
logger.error('Cannot write viewer into redis', { ip, videoId, stats, err })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.processingRedisWrites = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue