Viewers federation protocol v2

More efficient than the current one where instance is not fast enough to
send all viewers if a video becomes popular

The new protocol can be enabled by setting env
USE_VIEWERS_FEDERATION_V2='true'

Introduce a result field in View activity that contains the number of
viewers. This field is used by the origin instance to send the total
viewers on the video to remote instances. The difference with the
current protocol is that we don't have to send viewers individually to
remote instances.

There are 4 cases:
 * View activity from federation on Remote Video -> instance replaces
   all current viewers by a new viewer that contains the result counter
 * View activity from federation on Local Video -> instance adds the
   viewer without considering the result counter
 * Local view on Remote Video -> instance adds the viewer and send it to
   the origin instance
 * Local view on Local Video -> instance adds the viewer

Periodically PeerTube cleanups expired viewers. On local videos, the
instance sends to remote instances a View activity with the result
counter so they can update their viewers counter for that particular
video
This commit is contained in:
Chocobozzz 2023-12-01 14:53:17 +01:00
parent a73f476c8a
commit b4f4432459
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
13 changed files with 327 additions and 172 deletions

View File

@ -116,6 +116,11 @@ export interface ActivityView extends BaseActivity {
// If sending a "viewer" event
expires?: string
result?: {
type: 'InteractionCounter'
interactionType: 'WatchAction'
userInteractionCount: number
}
}
export interface ActivityDislike extends BaseActivity {

View File

@ -18,6 +18,7 @@ export interface VideoObject {
licence: ActivityIdentifierObject
language: ActivityIdentifierObject
subtitleLanguage: ActivityIdentifierObject[]
views: number
sensitive: boolean

View File

@ -56,3 +56,7 @@ export function isProdInstance () {
export function getAppNumber () {
return process.env.NODE_APP_INSTANCE || ''
}
export function isUsingViewersFederationV2 () {
return process.env.USE_VIEWERS_FEDERATION_V2 === 'true'
}

View File

@ -21,133 +21,171 @@ describe('Test video views/viewers counters', function () {
}
}
before(async function () {
this.timeout(120000)
function runTests () {
describe('Test views counter on VOD', function () {
let videoUUID: string
servers = await prepareViewsServers()
})
before(async function () {
this.timeout(120000)
describe('Test views counter on VOD', function () {
let videoUUID: string
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
videoUUID = uuid
await waitJobs(servers)
})
it('Should not view a video if watch time is below the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 0)
})
it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 1)
})
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 3)
})
})
describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand
before(async function () {
this.timeout(240000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)
for (let i = 0; i < 3; i++) {
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await wait(1000)
}
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})
it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(45000)
let error = false
do {
try {
await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)
error = false
await wait(2500)
} catch {
error = true
}
} while (error)
})
it('Should view on a remote and on local and display appropriate views/viewers', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await wait(3000) // Throttled federation
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 3)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 4)
})
after(async function () {
await stopFfmpeg(command)
})
})
}
describe('Federation V1', function () {
before(async function () {
this.timeout(120000)
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
videoUUID = uuid
await waitJobs(servers)
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: false })
})
it('Should not view a video if watch time is below the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 0)
})
it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 1)
})
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 3)
})
})
describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand
before(async function () {
this.timeout(240000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})
it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(30000)
await wait(12000)
await waitJobs(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view on a remote and on local and display 2 viewers and 3 views', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 2)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 3)
})
runTests()
after(async function () {
await stopFfmpeg(command)
await cleanupTests(servers)
})
})
after(async function () {
await cleanupTests(servers)
describe('Federation V2', function () {
before(async function () {
this.timeout(120000)
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: true })
})
runTests()
after(async function () {
await cleanupTests(servers)
})
})
})

View File

@ -30,8 +30,17 @@ async function processViewsBuffer (servers: PeerTubeServer[]) {
await waitJobs(servers)
}
async function prepareViewsServers () {
const servers = await createMultipleServers(2)
async function prepareViewsServers (options: {
viewersFederationV2?: boolean
viewExpiration?: string // default 1 second
} = {}) {
const { viewExpiration = '1 second' } = options
const env = options?.viewersFederationV2 === true
? { USE_VIEWERS_FEDERATION_V2: 'true' }
: undefined
const servers = await createMultipleServers(2, { views: { videos: { ip_view_expiration: viewExpiration } } }, { env })
await setAccessTokensToServers(servers)
await setDefaultVideoChannel(servers)

View File

@ -196,11 +196,17 @@ const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string
uuid: 'sc:identifier'
}),
View: buildContext({
WatchAction: 'sc:WatchAction',
InteractionCounter: 'sc:InteractionCounter',
interactionType: 'sc:interactionType',
userInteractionCount: 'sc:userInteractionCount'
}),
Collection: buildContext(),
Follow: buildContext(),
Reject: buildContext(),
Accept: buildContext(),
View: buildContext(),
Announce: buildContext(),
Comment: buildContext(),
Delete: buildContext(),

View File

@ -9,7 +9,6 @@ export function Debounce (config: { timeoutMS: number }) {
timeoutRef = setTimeout(() => {
original.apply(this, args)
}, config.timeoutMS)
}
}

View File

@ -28,11 +28,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
allowRefresh: false
})
const viewerExpires = activity.expires
? new Date(activity.expires)
: undefined
await VideoViewsManager.Instance.processRemoteView({
video,
viewerId: activity.id,
await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires })
viewerExpires: activity.expires
? new Date(activity.expires)
: undefined,
viewerResultCounter: getViewerResultCounter(activity)
})
if (video.isOwned()) {
// Forward the view but don't resend the activity to the sender
@ -40,3 +44,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
}
}
// Viewer protocol V2
function getViewerResultCounter (activity: ActivityView) {
const result = activity.result
if (!activity.expires || result?.interactionType !== 'WatchAction' || result?.type !== 'InteractionCounter') return undefined
const counter = parseInt(result.userInteractionCount + '')
if (isNaN(counter)) return undefined
return counter
}

View File

@ -6,24 +6,23 @@ import { logger } from '../../../helpers/logger.js'
import { audiencify, getAudience } from '../audience.js'
import { getLocalVideoViewActivityPubUrl } from '../url.js'
import { sendVideoRelatedActivity } from './shared/send-utils.js'
type ViewType = 'view' | 'viewer'
import { isUsingViewersFederationV2 } from '@peertube/peertube-node-utils'
async function sendView (options: {
byActor: MActorLight
type: ViewType
video: MVideoImmutable
viewerIdentifier: string
viewersCount?: number
transaction?: Transaction
}) {
const { byActor, type, video, viewerIdentifier, transaction } = options
const { byActor, viewersCount, video, viewerIdentifier, transaction } = options
logger.info('Creating job to send %s of %s.', type, video.url)
logger.info('Creating job to send %s of %s.', viewersCount !== undefined ? 'viewer' : 'view', video.url)
const activityBuilder = (audience: ActivityAudience) => {
const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier)
return buildViewActivity({ url, byActor, video, audience, type })
return buildViewActivity({ url, byActor, video, audience, viewersCount })
}
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
@ -41,22 +40,33 @@ function buildViewActivity (options: {
url: string
byActor: MActorAudience
video: MVideoUrl
type: ViewType
viewersCount?: number
audience?: ActivityAudience
}): ActivityView {
const { url, byActor, type, video, audience = getAudience(byActor) } = options
const { url, byActor, viewersCount, video, audience = getAudience(byActor) } = options
return audiencify(
{
id: url,
type: 'View' as 'View',
actor: byActor.url,
object: video.url,
const base = {
id: url,
type: 'View' as 'View',
actor: byActor.url,
object: video.url
}
expires: type === 'viewer'
? new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString()
: undefined
},
audience
)
if (viewersCount === undefined) {
return audiencify(base, audience)
}
return audiencify({
...base,
expires: new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString(),
result: isUsingViewersFederationV2()
? {
interactionType: 'WatchAction',
type: 'InteractionCounter',
userInteractionCount: viewersCount
}
: undefined
}, audience)
}

View File

@ -1,4 +1,5 @@
import { buildUUID, isTestOrDevInstance, sha256 } from '@peertube/peertube-node-utils'
import { buildUUID, isTestOrDevInstance, isUsingViewersFederationV2, sha256 } from '@peertube/peertube-node-utils'
import { exists } from '@server/helpers/custom-validators/misc.js'
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
import { VIEW_LIFETIME } from '@server/initializers/constants.js'
import { sendView } from '@server/lib/activitypub/send/send-view.js'
@ -17,6 +18,7 @@ type Viewer = {
id: string
viewerScope: ViewerScope
videoScope: VideoScope
viewerCount: number
lastFederation?: number
}
@ -54,22 +56,48 @@ export class VideoViewerCounters {
return false
}
const newViewer = await this.addViewerToVideo({ viewerId, video, viewerScope: 'local' })
const newViewer = this.addViewerToVideo({ viewerId, video, viewerScope: 'local', viewerCount: 1 })
await this.federateViewerIfNeeded(video, newViewer)
return true
}
async addRemoteViewer (options: {
addRemoteViewerOnLocalVideo (options: {
video: MVideo
viewerId: string
viewerExpires: Date
}) {
const { video, viewerExpires, viewerId } = options
logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
logger.debug('Adding remote viewer to local video %s.', video.uuid, { viewerId, viewerExpires, ...lTags(video.uuid) })
await this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote' })
this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote', viewerCount: 1 })
return true
}
addRemoteViewerOnRemoteVideo (options: {
video: MVideo
viewerId: string
viewerExpires: Date
viewerResultCounter?: number
}) {
const { video, viewerExpires, viewerId, viewerResultCounter } = options
logger.debug(
'Adding remote viewer to remote video %s.', video.uuid,
{ viewerId, viewerResultCounter, viewerExpires, ...lTags(video.uuid) }
)
this.addViewerToVideo({
video,
viewerExpires,
viewerId,
viewerScope: 'remote',
// The origin server sends a summary of all viewers, so we can replace our local copy
replaceCurrentViewers: exists(viewerResultCounter),
viewerCount: viewerResultCounter ?? 1
})
return true
}
@ -83,17 +111,17 @@ export class VideoViewerCounters {
let total = 0
for (const viewers of this.viewersPerVideo.values()) {
total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope).length
total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope)
.reduce((p, c) => p + c.viewerCount, 0)
}
return total
}
getViewers (video: MVideo) {
getTotalViewersOf (video: MVideoImmutable) {
const viewers = this.viewersPerVideo.get(video.id)
if (!viewers) return 0
return viewers.length
return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0
}
buildViewerExpireTime () {
@ -102,17 +130,19 @@ export class VideoViewerCounters {
// ---------------------------------------------------------------------------
private async addViewerToVideo (options: {
private addViewerToVideo (options: {
video: MVideoImmutable
viewerId: string
viewerScope: ViewerScope
viewerCount: number
replaceCurrentViewers?: boolean
viewerExpires?: Date
}) {
const { video, viewerExpires, viewerId, viewerScope } = options
const { video, viewerExpires, viewerId, viewerScope, viewerCount, replaceCurrentViewers } = options
let watchers = this.viewersPerVideo.get(video.id)
if (!watchers) {
if (!watchers || replaceCurrentViewers) {
watchers = []
this.viewersPerVideo.set(video.id, watchers)
}
@ -125,12 +155,12 @@ export class VideoViewerCounters {
? 'remote'
: 'local'
const viewer = { id: viewerId, expires, videoScope, viewerScope }
const viewer = { id: viewerId, expires, videoScope, viewerScope, viewerCount }
watchers.push(viewer)
this.idToViewer.set(viewerId, viewer)
await this.notifyClients(video.id, watchers.length)
this.notifyClients(video)
return viewer
}
@ -162,7 +192,16 @@ export class VideoViewerCounters {
if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
else this.viewersPerVideo.set(videoId, newViewers)
await this.notifyClients(videoId, newViewers.length)
const video = await VideoModel.loadImmutableAttributes(videoId)
if (video) {
this.notifyClients(video)
// Let total viewers expire on remote instances if there are no more viewers
if (video.remote === false && newViewers.length !== 0) {
await this.federateTotalViewers(video)
}
}
}
} catch (err) {
logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
@ -171,13 +210,11 @@ export class VideoViewerCounters {
this.processingViewerCounters = false
}
private async notifyClients (videoId: string | number, viewersLength: number) {
const video = await VideoModel.loadImmutableAttributes(videoId)
if (!video) return
private notifyClients (video: MVideoImmutable) {
const totalViewers = this.getTotalViewersOf(video)
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, totalViewers)
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
logger.debug('Video viewers update for %s is %d.', video.url, totalViewers, lTags())
}
private generateViewerId (ip: string, videoUUID: string) {
@ -190,8 +227,26 @@ export class VideoViewerCounters {
const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75)
if (viewer.lastFederation && viewer.lastFederation > federationLimit) return
if (video.remote === false && isUsingViewersFederationV2()) return
await sendView({
byActor: await getServerActor(),
video,
viewersCount: 1,
viewerIdentifier: viewer.id
})
await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id })
viewer.lastFederation = now
}
private async federateTotalViewers (video: MVideoImmutable) {
if (!isUsingViewersFederationV2()) return
await sendView({
byActor: await getServerActor(),
video,
viewersCount: this.getTotalViewersOf(video),
viewerIdentifier: video.uuid
})
}
}

View File

@ -35,7 +35,7 @@ export class VideoViews {
await this.addView(video)
await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() })
await sendView({ byActor: await getServerActor(), video, viewerIdentifier: buildUUID() })
return true
}

View File

@ -66,17 +66,29 @@ export class VideoViewsManager {
video: MVideo
viewerId: string | null
viewerExpires?: Date
viewerResultCounter?: number
}) {
const { video, viewerId, viewerExpires } = options
const { video, viewerId, viewerExpires, viewerResultCounter } = options
logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() })
if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires })
else await this.videoViews.addRemoteView({ video })
// Viewer
if (viewerExpires) {
if (video.remote === false) {
this.videoViewerCounters.addRemoteViewerOnLocalVideo({ video, viewerId, viewerExpires })
return
}
this.videoViewerCounters.addRemoteViewerOnRemoteVideo({ video, viewerId, viewerExpires, viewerResultCounter })
return
}
// Just a view
await this.videoViews.addRemoteView({ video })
}
getViewers (video: MVideo) {
return this.videoViewerCounters.getViewers(video)
getTotalViewersOf (video: MVideo) {
return this.videoViewerCounters.getTotalViewersOf(video)
}
getTotalViewers (options: {

View File

@ -90,7 +90,7 @@ export function videoModelToFormattedJSON (video: MVideoFormattable, options: Vi
duration: video.duration,
views: video.views,
viewers: VideoViewsManager.Instance.getViewers(video),
viewers: VideoViewsManager.Instance.getTotalViewersOf(video),
likes: video.likes,
dislikes: video.dislikes,