From b4f4432459f22994cb8fa667c862a0edd7af0ebc Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 1 Dec 2023 14:53:17 +0100 Subject: [PATCH] Viewers federation protocol v2 More efficient than the current one where instance is not fast enough to send all viewers if a video becomes popular The new protocol can be enabled by setting env USE_VIEWERS_FEDERATION_V2='true' Introduce a result field in View activity that contains the number of viewers. This field is used by the origin instance to send the total viewers on the video to remote instances. The difference with the current protocol is that we don't have to send viewers individually to remote instances. There are 4 cases: * View activity from federation on Remote Video -> instance replaces all current viewers by a new viewer that contains the result counter * View activity from federation on Local Video -> instance adds the viewer without considering the result counter * Local view on Remote Video -> instance adds the viewer and send it to the origin instance * Local view on Local Video -> instance adds the viewer Periodically PeerTube cleanups expired viewers. On local videos, the instance sends to remote instances a View activity with the result counter so they can update their viewers counter for that particular video --- packages/models/src/activitypub/activity.ts | 5 + .../src/activitypub/objects/video-object.ts | 1 + packages/node-utils/src/env.ts | 4 + .../src/api/views/video-views-counter.ts | 268 ++++++++++-------- packages/tests/src/shared/views.ts | 13 +- server/core/helpers/activity-pub-utils.ts | 8 +- server/core/helpers/debounce.ts | 1 - .../lib/activitypub/process/process-view.ts | 24 +- server/core/lib/activitypub/send/send-view.ts | 50 ++-- .../lib/views/shared/video-viewer-counters.ts | 99 +++++-- server/core/lib/views/shared/video-views.ts | 2 +- server/core/lib/views/video-views-manager.ts | 22 +- .../video/formatter/video-api-format.ts | 2 +- 13 files changed, 327 insertions(+), 172 deletions(-) diff --git a/packages/models/src/activitypub/activity.ts b/packages/models/src/activitypub/activity.ts index 78a3ab33b..22f9edd78 100644 --- a/packages/models/src/activitypub/activity.ts +++ b/packages/models/src/activitypub/activity.ts @@ -116,6 +116,11 @@ export interface ActivityView extends BaseActivity { // If sending a "viewer" event expires?: string + result?: { + type: 'InteractionCounter' + interactionType: 'WatchAction' + userInteractionCount: number + } } export interface ActivityDislike extends BaseActivity { diff --git a/packages/models/src/activitypub/objects/video-object.ts b/packages/models/src/activitypub/objects/video-object.ts index 9abae6a39..754f69f83 100644 --- a/packages/models/src/activitypub/objects/video-object.ts +++ b/packages/models/src/activitypub/objects/video-object.ts @@ -18,6 +18,7 @@ export interface VideoObject { licence: ActivityIdentifierObject language: ActivityIdentifierObject subtitleLanguage: ActivityIdentifierObject[] + views: number sensitive: boolean diff --git a/packages/node-utils/src/env.ts b/packages/node-utils/src/env.ts index 1a28f509e..1c6a956b9 100644 --- a/packages/node-utils/src/env.ts +++ b/packages/node-utils/src/env.ts @@ -56,3 +56,7 @@ export function isProdInstance () { export function getAppNumber () { return process.env.NODE_APP_INSTANCE || '' } + +export function isUsingViewersFederationV2 () { + return process.env.USE_VIEWERS_FEDERATION_V2 === 'true' +} diff --git a/packages/tests/src/api/views/video-views-counter.ts b/packages/tests/src/api/views/video-views-counter.ts index d9afb0f18..9b83176dc 100644 --- a/packages/tests/src/api/views/video-views-counter.ts +++ b/packages/tests/src/api/views/video-views-counter.ts @@ -21,133 +21,171 @@ describe('Test video views/viewers counters', function () { } } - before(async function () { - this.timeout(120000) + function runTests () { + describe('Test views counter on VOD', function () { + let videoUUID: string - servers = await prepareViewsServers() - }) + before(async function () { + this.timeout(120000) - describe('Test views counter on VOD', function () { - let videoUUID: string + const { uuid } = await servers[0].videos.quickUpload({ name: 'video' }) + videoUUID = uuid + + await waitJobs(servers) + }) + + it('Should not view a video if watch time is below the threshold', async function () { + await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] }) + await processViewsBuffer(servers) + + await checkCounter('views', videoUUID, 0) + }) + + it('Should view a video if watch time is above the threshold', async function () { + await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] }) + await processViewsBuffer(servers) + + await checkCounter('views', videoUUID, 1) + }) + + it('Should not view again this video with the same IP', async function () { + await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] }) + await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] }) + await processViewsBuffer(servers) + + await checkCounter('views', videoUUID, 2) + }) + + it('Should view the video from server 2 and send the event', async function () { + await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] }) + await waitJobs(servers) + await processViewsBuffer(servers) + + await checkCounter('views', videoUUID, 3) + }) + }) + + describe('Test views and viewers counters on live and VOD', function () { + let liveVideoId: string + let vodVideoId: string + let command: FfmpegCommand + + before(async function () { + this.timeout(240000); + + ({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true })) + }) + + it('Should display no views and viewers', async function () { + await checkCounter('views', liveVideoId, 0) + await checkCounter('viewers', liveVideoId, 0) + + await checkCounter('views', vodVideoId, 0) + await checkCounter('viewers', vodVideoId, 0) + }) + + it('Should view twice and display 1 view/viewer', async function () { + this.timeout(30000) + + for (let i = 0; i < 3; i++) { + await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) + await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) + await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) + await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) + + await wait(1000) + } + + await waitJobs(servers) + + await checkCounter('viewers', liveVideoId, 1) + await checkCounter('viewers', vodVideoId, 1) + + await processViewsBuffer(servers) + + await checkCounter('views', liveVideoId, 1) + await checkCounter('views', vodVideoId, 1) + }) + + it('Should wait and display 0 viewers but still have 1 view', async function () { + this.timeout(45000) + + let error = false + + do { + try { + await checkCounter('views', liveVideoId, 1) + await checkCounter('viewers', liveVideoId, 0) + + await checkCounter('views', vodVideoId, 1) + await checkCounter('viewers', vodVideoId, 0) + + error = false + await wait(2500) + } catch { + error = true + } + } while (error) + }) + + it('Should view on a remote and on local and display appropriate views/viewers', async function () { + this.timeout(30000) + + await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] }) + await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] }) + await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) + await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) + await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) + + await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) + await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) + await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) + + await wait(3000) // Throttled federation + await waitJobs(servers) + + await checkCounter('viewers', liveVideoId, 2) + await checkCounter('viewers', vodVideoId, 3) + + await processViewsBuffer(servers) + + await checkCounter('views', liveVideoId, 3) + await checkCounter('views', vodVideoId, 4) + }) + + after(async function () { + await stopFfmpeg(command) + }) + }) + } + + describe('Federation V1', function () { before(async function () { this.timeout(120000) - const { uuid } = await servers[0].videos.quickUpload({ name: 'video' }) - videoUUID = uuid - - await waitJobs(servers) + servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: false }) }) - it('Should not view a video if watch time is below the threshold', async function () { - await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] }) - await processViewsBuffer(servers) - - await checkCounter('views', videoUUID, 0) - }) - - it('Should view a video if watch time is above the threshold', async function () { - await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] }) - await processViewsBuffer(servers) - - await checkCounter('views', videoUUID, 1) - }) - - it('Should not view again this video with the same IP', async function () { - await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] }) - await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] }) - await processViewsBuffer(servers) - - await checkCounter('views', videoUUID, 2) - }) - - it('Should view the video from server 2 and send the event', async function () { - await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] }) - await waitJobs(servers) - await processViewsBuffer(servers) - - await checkCounter('views', videoUUID, 3) - }) - }) - - describe('Test views and viewers counters on live and VOD', function () { - let liveVideoId: string - let vodVideoId: string - let command: FfmpegCommand - - before(async function () { - this.timeout(240000); - - ({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true })) - }) - - it('Should display no views and viewers', async function () { - await checkCounter('views', liveVideoId, 0) - await checkCounter('viewers', liveVideoId, 0) - - await checkCounter('views', vodVideoId, 0) - await checkCounter('viewers', vodVideoId, 0) - }) - - it('Should view twice and display 1 view/viewer', async function () { - this.timeout(30000) - - await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) - await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) - await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) - await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) - - await waitJobs(servers) - await checkCounter('viewers', liveVideoId, 1) - await checkCounter('viewers', vodVideoId, 1) - - await processViewsBuffer(servers) - - await checkCounter('views', liveVideoId, 1) - await checkCounter('views', vodVideoId, 1) - }) - - it('Should wait and display 0 viewers but still have 1 view', async function () { - this.timeout(30000) - - await wait(12000) - await waitJobs(servers) - - await checkCounter('views', liveVideoId, 1) - await checkCounter('viewers', liveVideoId, 0) - - await checkCounter('views', vodVideoId, 1) - await checkCounter('viewers', vodVideoId, 0) - }) - - it('Should view on a remote and on local and display 2 viewers and 3 views', async function () { - this.timeout(30000) - - await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) - await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) - await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] }) - - await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) - await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) - await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] }) - - await waitJobs(servers) - - await checkCounter('viewers', liveVideoId, 2) - await checkCounter('viewers', vodVideoId, 2) - - await processViewsBuffer(servers) - - await checkCounter('views', liveVideoId, 3) - await checkCounter('views', vodVideoId, 3) - }) + runTests() after(async function () { - await stopFfmpeg(command) + await cleanupTests(servers) }) }) - after(async function () { - await cleanupTests(servers) + describe('Federation V2', function () { + + before(async function () { + this.timeout(120000) + + servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: true }) + }) + + runTests() + + after(async function () { + await cleanupTests(servers) + }) }) }) diff --git a/packages/tests/src/shared/views.ts b/packages/tests/src/shared/views.ts index 8d0fa567d..ef97fa442 100644 --- a/packages/tests/src/shared/views.ts +++ b/packages/tests/src/shared/views.ts @@ -30,8 +30,17 @@ async function processViewsBuffer (servers: PeerTubeServer[]) { await waitJobs(servers) } -async function prepareViewsServers () { - const servers = await createMultipleServers(2) +async function prepareViewsServers (options: { + viewersFederationV2?: boolean + viewExpiration?: string // default 1 second +} = {}) { + const { viewExpiration = '1 second' } = options + + const env = options?.viewersFederationV2 === true + ? { USE_VIEWERS_FEDERATION_V2: 'true' } + : undefined + + const servers = await createMultipleServers(2, { views: { videos: { ip_view_expiration: viewExpiration } } }, { env }) await setAccessTokensToServers(servers) await setDefaultVideoChannel(servers) diff --git a/server/core/helpers/activity-pub-utils.ts b/server/core/helpers/activity-pub-utils.ts index 6e28c90e2..fb9dda336 100644 --- a/server/core/helpers/activity-pub-utils.ts +++ b/server/core/helpers/activity-pub-utils.ts @@ -196,11 +196,17 @@ const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string uuid: 'sc:identifier' }), + View: buildContext({ + WatchAction: 'sc:WatchAction', + InteractionCounter: 'sc:InteractionCounter', + interactionType: 'sc:interactionType', + userInteractionCount: 'sc:userInteractionCount' + }), + Collection: buildContext(), Follow: buildContext(), Reject: buildContext(), Accept: buildContext(), - View: buildContext(), Announce: buildContext(), Comment: buildContext(), Delete: buildContext(), diff --git a/server/core/helpers/debounce.ts b/server/core/helpers/debounce.ts index 77d99a894..d39540813 100644 --- a/server/core/helpers/debounce.ts +++ b/server/core/helpers/debounce.ts @@ -9,7 +9,6 @@ export function Debounce (config: { timeoutMS: number }) { timeoutRef = setTimeout(() => { original.apply(this, args) - }, config.timeoutMS) } } diff --git a/server/core/lib/activitypub/process/process-view.ts b/server/core/lib/activitypub/process/process-view.ts index c96948e85..630ce05d8 100644 --- a/server/core/lib/activitypub/process/process-view.ts +++ b/server/core/lib/activitypub/process/process-view.ts @@ -28,11 +28,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu allowRefresh: false }) - const viewerExpires = activity.expires - ? new Date(activity.expires) - : undefined + await VideoViewsManager.Instance.processRemoteView({ + video, + viewerId: activity.id, - await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires }) + viewerExpires: activity.expires + ? new Date(activity.expires) + : undefined, + viewerResultCounter: getViewerResultCounter(activity) + }) if (video.isOwned()) { // Forward the view but don't resend the activity to the sender @@ -40,3 +44,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu await forwardVideoRelatedActivity(activity, undefined, exceptions, video) } } + +// Viewer protocol V2 +function getViewerResultCounter (activity: ActivityView) { + const result = activity.result + + if (!activity.expires || result?.interactionType !== 'WatchAction' || result?.type !== 'InteractionCounter') return undefined + + const counter = parseInt(result.userInteractionCount + '') + if (isNaN(counter)) return undefined + + return counter +} diff --git a/server/core/lib/activitypub/send/send-view.ts b/server/core/lib/activitypub/send/send-view.ts index 10cd9508f..46f91a275 100644 --- a/server/core/lib/activitypub/send/send-view.ts +++ b/server/core/lib/activitypub/send/send-view.ts @@ -6,24 +6,23 @@ import { logger } from '../../../helpers/logger.js' import { audiencify, getAudience } from '../audience.js' import { getLocalVideoViewActivityPubUrl } from '../url.js' import { sendVideoRelatedActivity } from './shared/send-utils.js' - -type ViewType = 'view' | 'viewer' +import { isUsingViewersFederationV2 } from '@peertube/peertube-node-utils' async function sendView (options: { byActor: MActorLight - type: ViewType video: MVideoImmutable viewerIdentifier: string + viewersCount?: number transaction?: Transaction }) { - const { byActor, type, video, viewerIdentifier, transaction } = options + const { byActor, viewersCount, video, viewerIdentifier, transaction } = options - logger.info('Creating job to send %s of %s.', type, video.url) + logger.info('Creating job to send %s of %s.', viewersCount !== undefined ? 'viewer' : 'view', video.url) const activityBuilder = (audience: ActivityAudience) => { const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier) - return buildViewActivity({ url, byActor, video, audience, type }) + return buildViewActivity({ url, byActor, video, audience, viewersCount }) } return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true }) @@ -41,22 +40,33 @@ function buildViewActivity (options: { url: string byActor: MActorAudience video: MVideoUrl - type: ViewType + viewersCount?: number audience?: ActivityAudience }): ActivityView { - const { url, byActor, type, video, audience = getAudience(byActor) } = options + const { url, byActor, viewersCount, video, audience = getAudience(byActor) } = options - return audiencify( - { - id: url, - type: 'View' as 'View', - actor: byActor.url, - object: video.url, + const base = { + id: url, + type: 'View' as 'View', + actor: byActor.url, + object: video.url + } - expires: type === 'viewer' - ? new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString() - : undefined - }, - audience - ) + if (viewersCount === undefined) { + return audiencify(base, audience) + } + + return audiencify({ + ...base, + + expires: new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString(), + + result: isUsingViewersFederationV2() + ? { + interactionType: 'WatchAction', + type: 'InteractionCounter', + userInteractionCount: viewersCount + } + : undefined + }, audience) } diff --git a/server/core/lib/views/shared/video-viewer-counters.ts b/server/core/lib/views/shared/video-viewer-counters.ts index 605e45645..8a18f94db 100644 --- a/server/core/lib/views/shared/video-viewer-counters.ts +++ b/server/core/lib/views/shared/video-viewer-counters.ts @@ -1,4 +1,5 @@ -import { buildUUID, isTestOrDevInstance, sha256 } from '@peertube/peertube-node-utils' +import { buildUUID, isTestOrDevInstance, isUsingViewersFederationV2, sha256 } from '@peertube/peertube-node-utils' +import { exists } from '@server/helpers/custom-validators/misc.js' import { logger, loggerTagsFactory } from '@server/helpers/logger.js' import { VIEW_LIFETIME } from '@server/initializers/constants.js' import { sendView } from '@server/lib/activitypub/send/send-view.js' @@ -17,6 +18,7 @@ type Viewer = { id: string viewerScope: ViewerScope videoScope: VideoScope + viewerCount: number lastFederation?: number } @@ -54,22 +56,48 @@ export class VideoViewerCounters { return false } - const newViewer = await this.addViewerToVideo({ viewerId, video, viewerScope: 'local' }) + const newViewer = this.addViewerToVideo({ viewerId, video, viewerScope: 'local', viewerCount: 1 }) await this.federateViewerIfNeeded(video, newViewer) return true } - async addRemoteViewer (options: { + addRemoteViewerOnLocalVideo (options: { video: MVideo viewerId: string viewerExpires: Date }) { const { video, viewerExpires, viewerId } = options - logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) + logger.debug('Adding remote viewer to local video %s.', video.uuid, { viewerId, viewerExpires, ...lTags(video.uuid) }) - await this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote' }) + this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote', viewerCount: 1 }) + + return true + } + + addRemoteViewerOnRemoteVideo (options: { + video: MVideo + viewerId: string + viewerExpires: Date + viewerResultCounter?: number + }) { + const { video, viewerExpires, viewerId, viewerResultCounter } = options + + logger.debug( + 'Adding remote viewer to remote video %s.', video.uuid, + { viewerId, viewerResultCounter, viewerExpires, ...lTags(video.uuid) } + ) + + this.addViewerToVideo({ + video, + viewerExpires, + viewerId, + viewerScope: 'remote', + // The origin server sends a summary of all viewers, so we can replace our local copy + replaceCurrentViewers: exists(viewerResultCounter), + viewerCount: viewerResultCounter ?? 1 + }) return true } @@ -83,17 +111,17 @@ export class VideoViewerCounters { let total = 0 for (const viewers of this.viewersPerVideo.values()) { - total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope).length + total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope) + .reduce((p, c) => p + c.viewerCount, 0) } return total } - getViewers (video: MVideo) { + getTotalViewersOf (video: MVideoImmutable) { const viewers = this.viewersPerVideo.get(video.id) - if (!viewers) return 0 - return viewers.length + return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0 } buildViewerExpireTime () { @@ -102,17 +130,19 @@ export class VideoViewerCounters { // --------------------------------------------------------------------------- - private async addViewerToVideo (options: { + private addViewerToVideo (options: { video: MVideoImmutable viewerId: string viewerScope: ViewerScope + viewerCount: number + replaceCurrentViewers?: boolean viewerExpires?: Date }) { - const { video, viewerExpires, viewerId, viewerScope } = options + const { video, viewerExpires, viewerId, viewerScope, viewerCount, replaceCurrentViewers } = options let watchers = this.viewersPerVideo.get(video.id) - if (!watchers) { + if (!watchers || replaceCurrentViewers) { watchers = [] this.viewersPerVideo.set(video.id, watchers) } @@ -125,12 +155,12 @@ export class VideoViewerCounters { ? 'remote' : 'local' - const viewer = { id: viewerId, expires, videoScope, viewerScope } + const viewer = { id: viewerId, expires, videoScope, viewerScope, viewerCount } watchers.push(viewer) this.idToViewer.set(viewerId, viewer) - await this.notifyClients(video.id, watchers.length) + this.notifyClients(video) return viewer } @@ -162,7 +192,16 @@ export class VideoViewerCounters { if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) else this.viewersPerVideo.set(videoId, newViewers) - await this.notifyClients(videoId, newViewers.length) + const video = await VideoModel.loadImmutableAttributes(videoId) + + if (video) { + this.notifyClients(video) + + // Let total viewers expire on remote instances if there are no more viewers + if (video.remote === false && newViewers.length !== 0) { + await this.federateTotalViewers(video) + } + } } } catch (err) { logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) @@ -171,13 +210,11 @@ export class VideoViewerCounters { this.processingViewerCounters = false } - private async notifyClients (videoId: string | number, viewersLength: number) { - const video = await VideoModel.loadImmutableAttributes(videoId) - if (!video) return + private notifyClients (video: MVideoImmutable) { + const totalViewers = this.getTotalViewersOf(video) + PeerTubeSocket.Instance.sendVideoViewsUpdate(video, totalViewers) - PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) - - logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) + logger.debug('Video viewers update for %s is %d.', video.url, totalViewers, lTags()) } private generateViewerId (ip: string, videoUUID: string) { @@ -190,8 +227,26 @@ export class VideoViewerCounters { const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75) if (viewer.lastFederation && viewer.lastFederation > federationLimit) return + if (video.remote === false && isUsingViewersFederationV2()) return + + await sendView({ + byActor: await getServerActor(), + video, + viewersCount: 1, + viewerIdentifier: viewer.id + }) - await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id }) viewer.lastFederation = now } + + private async federateTotalViewers (video: MVideoImmutable) { + if (!isUsingViewersFederationV2()) return + + await sendView({ + byActor: await getServerActor(), + video, + viewersCount: this.getTotalViewersOf(video), + viewerIdentifier: video.uuid + }) + } } diff --git a/server/core/lib/views/shared/video-views.ts b/server/core/lib/views/shared/video-views.ts index 3ce415dbe..ca1941f68 100644 --- a/server/core/lib/views/shared/video-views.ts +++ b/server/core/lib/views/shared/video-views.ts @@ -35,7 +35,7 @@ export class VideoViews { await this.addView(video) - await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() }) + await sendView({ byActor: await getServerActor(), video, viewerIdentifier: buildUUID() }) return true } diff --git a/server/core/lib/views/video-views-manager.ts b/server/core/lib/views/video-views-manager.ts index 9fc76d55e..0d20d5f34 100644 --- a/server/core/lib/views/video-views-manager.ts +++ b/server/core/lib/views/video-views-manager.ts @@ -66,17 +66,29 @@ export class VideoViewsManager { video: MVideo viewerId: string | null viewerExpires?: Date + viewerResultCounter?: number }) { - const { video, viewerId, viewerExpires } = options + const { video, viewerId, viewerExpires, viewerResultCounter } = options logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() }) - if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires }) - else await this.videoViews.addRemoteView({ video }) + // Viewer + if (viewerExpires) { + if (video.remote === false) { + this.videoViewerCounters.addRemoteViewerOnLocalVideo({ video, viewerId, viewerExpires }) + return + } + + this.videoViewerCounters.addRemoteViewerOnRemoteVideo({ video, viewerId, viewerExpires, viewerResultCounter }) + return + } + + // Just a view + await this.videoViews.addRemoteView({ video }) } - getViewers (video: MVideo) { - return this.videoViewerCounters.getViewers(video) + getTotalViewersOf (video: MVideo) { + return this.videoViewerCounters.getTotalViewersOf(video) } getTotalViewers (options: { diff --git a/server/core/models/video/formatter/video-api-format.ts b/server/core/models/video/formatter/video-api-format.ts index 958832485..a4bb0b733 100644 --- a/server/core/models/video/formatter/video-api-format.ts +++ b/server/core/models/video/formatter/video-api-format.ts @@ -90,7 +90,7 @@ export function videoModelToFormattedJSON (video: MVideoFormattable, options: Vi duration: video.duration, views: video.views, - viewers: VideoViewsManager.Instance.getViewers(video), + viewers: VideoViewsManager.Instance.getTotalViewersOf(video), likes: video.likes, dislikes: video.dislikes,