From dbdc20e673f845b4a143b34d696c4368c5addf60 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 30 Nov 2023 10:50:47 +0100 Subject: [PATCH] Optimize views endpoint Lazy write data in redis --- .../api/views/video-views-retention-stats.ts | 23 ++++++++++++++ server/core/initializers/constants.ts | 4 +++ .../lib/views/shared/video-viewer-stats.ts | 30 +++++++++++++++++-- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/packages/tests/src/api/views/video-views-retention-stats.ts b/packages/tests/src/api/views/video-views-retention-stats.ts index 4cd0c7da9..9279b29df 100644 --- a/packages/tests/src/api/views/video-views-retention-stats.ts +++ b/packages/tests/src/api/views/video-views-retention-stats.ts @@ -3,6 +3,7 @@ import { expect } from 'chai' import { prepareViewsServers, prepareViewsVideos, processViewersStats } from '@tests/shared/views.js' import { cleanupTests, PeerTubeServer } from '@peertube/peertube-server-commands' +import { wait } from '@peertube/peertube-core-utils' describe('Test views retention stats', function () { let servers: PeerTubeServer[] @@ -45,6 +46,28 @@ describe('Test views retention stats', function () { expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 75, 25, 25, 25, 0 ]) }) + + it('Should display appropriate retention metrics after a server restart', async function () { + this.timeout(240000) + + const newVideo = await servers[0].videos.quickUpload({ name: 'video 2' }) + + await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.2,127.0.0.1', id: newVideo.id, currentTimes: [ 0, 1 ] }) + await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.3,127.0.0.1', id: newVideo.id, currentTimes: [ 1, 3 ] }) + + await wait(2500) + + await servers[0].kill() + + await servers[0].run() + + await processViewersStats(servers) + + const { data } = await servers[0].videoStats.getRetentionStats({ videoId: newVideo.id }) + expect(data).to.have.lengthOf(6) + + expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 100, 50, 50, 0, 0 ]) + }) }) after(async function () { diff --git a/server/core/initializers/constants.ts b/server/core/initializers/constants.ts index d632b94e9..669de5a47 100644 --- a/server/core/initializers/constants.ts +++ b/server/core/initializers/constants.ts @@ -480,6 +480,7 @@ const VIEW_LIFETIME = { VIEWER_COUNTER: 60000 * 2, // 2 minutes VIEWER_STATS: 60000 * 60 // 1 hour } +let VIEWER_SYNC_REDIS = 30000 // Sync viewer into redis const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100 @@ -1102,6 +1103,8 @@ if (process.env.PRODUCTION_CONSTANTS !== 'true') { PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000 + + VIEWER_SYNC_REDIS = 1000 } if (isTestInstance()) { @@ -1202,6 +1205,7 @@ export { DEFAULT_THEME_NAME, NSFW_POLICY_TYPES, STATIC_MAX_AGE, + VIEWER_SYNC_REDIS, STATIC_PATHS, VIDEO_IMPORT_TIMEOUT, VIDEO_PLAYLIST_TYPES, diff --git a/server/core/lib/views/shared/video-viewer-stats.ts b/server/core/lib/views/shared/video-viewer-stats.ts index 5b5998b0a..957e6decb 100644 --- a/server/core/lib/views/shared/video-viewer-stats.ts +++ b/server/core/lib/views/shared/video-viewer-stats.ts @@ -3,7 +3,7 @@ import { VideoViewEvent } from '@peertube/peertube-models' import { isTestOrDevInstance } from '@peertube/peertube-node-utils' import { GeoIP } from '@server/helpers/geo-ip.js' import { logger, loggerTagsFactory } from '@server/helpers/logger.js' -import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants.js' +import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEWER_SYNC_REDIS, VIEW_LIFETIME } from '@server/initializers/constants.js' import { sequelizeTypescript } from '@server/initializers/database.js' import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js' import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js' @@ -33,11 +33,14 @@ type LocalViewerStats = { export class VideoViewerStats { private processingViewersStats = false + private processingRedisWrites = false private readonly viewerCache = new Map() + private readonly redisPendingWrites = new Map() constructor () { setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) + setInterval(() => this.syncRedisWrites(), VIEWER_SYNC_REDIS) } // --------------------------------------------------------------------------- @@ -123,7 +126,7 @@ export class VideoViewerStats { logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) - await this.setLocalVideoViewer(ip, video.id, stats) + this.setLocalVideoViewer(ip, video.id, stats) } async processViewerStats () { @@ -135,6 +138,8 @@ export class VideoViewerStats { const now = new Date().getTime() try { + await this.syncRedisWrites() + const allKeys = await Redis.Instance.listLocalVideoViewerKeys() for (const key of allKeys) { @@ -222,7 +227,7 @@ export class VideoViewerStats { const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(ip, videoId) this.viewerCache.set(viewerKey, stats) - return Redis.Instance.setLocalVideoViewer(ip, videoId, stats) + this.redisPendingWrites.set(viewerKey, { ip, videoId, stats }) } private deleteLocalVideoViewersKeys (key: string) { @@ -230,4 +235,23 @@ export class VideoViewerStats { return Redis.Instance.deleteLocalVideoViewersKeys(key) } + + private async syncRedisWrites () { + if (this.processingRedisWrites) return + + this.processingRedisWrites = true + + for (const [ key, pendingWrite ] of this.redisPendingWrites) { + const { ip, videoId, stats } = pendingWrite + this.redisPendingWrites.delete(key) + + try { + await Redis.Instance.setLocalVideoViewer(ip, videoId, stats) + } catch (err) { + logger.error('Cannot write viewer into redis', { ip, videoId, stats, err }) + } + } + + this.processingRedisWrites = false + } }