Refactor video views

Introduce viewers attribute for live videos
Count views for live videos
Reduce delay to see the viewer update for lives
Add ability to configure video views buffer interval and view ip
expiration
This commit is contained in:
Chocobozzz 2021-11-09 10:11:20 +01:00 committed by Chocobozzz
parent 221ee1adc9
commit 51353d9a03
31 changed files with 434 additions and 251 deletions

View File

@ -36,7 +36,7 @@ export class JobsComponent extends RestTable implements OnInit {
'video-live-ending', 'video-live-ending',
'video-redundancy', 'video-redundancy',
'video-transcoding', 'video-transcoding',
'videos-views', 'videos-views-stats',
'move-to-object-storage' 'move-to-object-storage'
] ]

View File

@ -658,7 +658,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
return this.peertubeSocket.getLiveVideosObservable() return this.peertubeSocket.getLiveVideosObservable()
.subscribe(({ type, payload }) => { .subscribe(({ type, payload }) => {
if (type === 'state-change') return this.handleLiveStateChange(payload.state) if (type === 'state-change') return this.handleLiveStateChange(payload.state)
if (type === 'views-change') return this.handleLiveViewsChange(payload.views) if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers)
}) })
} }
@ -677,7 +677,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
this.loadVideo(videoUUID) this.loadVideo(videoUUID)
} }
private handleLiveViewsChange (newViews: number) { private handleLiveViewsChange (newViewers: number) {
if (!this.video) { if (!this.video) {
console.error('Cannot update video live views because video is no defined.') console.error('Cannot update video live views because video is no defined.')
return return
@ -685,7 +685,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
console.log('Updating live views.') console.log('Updating live views.')
this.video.views = newViews this.video.viewers = newViewers
} }
private initHotkeys () { private initHotkeys () {

View File

@ -57,6 +57,9 @@ export class Video implements VideoServerModel {
url: string url: string
views: number views: number
// If live
viewers?: number
likes: number likes: number
dislikes: number dislikes: number
nsfw: boolean nsfw: boolean
@ -150,6 +153,7 @@ export class Video implements VideoServerModel {
this.url = hash.url this.url = hash.url
this.views = hash.views this.views = hash.views
this.viewers = hash.viewers
this.likes = hash.likes this.likes = hash.likes
this.dislikes = hash.dislikes this.dislikes = hash.dislikes

View File

@ -4,6 +4,6 @@
</ng-container> </ng-container>
<ng-container i18n *ngIf="video.isLive"> <ng-container i18n *ngIf="video.isLive">
{video.views, plural, =1 {1 viewer} other {{{ video.views | myNumberFormatter }} viewers}} {video.viewers, plural, =1 {1 viewer} other {{{ video.viewers | myNumberFormatter }} viewers}}
</ng-container> </ng-container>
</span> </span>

View File

@ -232,6 +232,11 @@ views:
remote: remote:
max_age: '30 days' max_age: '30 days'
# PeerTube buffers local video views before updating and federating the video
local_buffer_update_interval: '30 minutes'
ip_view_expiration: '1 hour'
plugins: plugins:
# The website PeerTube will ask for available PeerTube plugins and themes # The website PeerTube will ask for available PeerTube plugins and themes
# This is an unmoderated plugin index, so only install plugins/themes you trust # This is an unmoderated plugin index, so only install plugins/themes you trust

View File

@ -230,6 +230,11 @@ views:
remote: remote:
max_age: '30 days' max_age: '30 days'
# PeerTube buffers local video views before updating and federating the video
local_buffer_update_interval: '30 minutes'
ip_view_expiration: '1 hour'
plugins: plugins:
# The website PeerTube will ask for available PeerTube plugins and themes # The website PeerTube will ask for available PeerTube plugins and themes
# This is an unmoderated plugin index, so only install plugins/themes you trust # This is an unmoderated plugin index, so only install plugins/themes you trust

View File

@ -160,3 +160,6 @@ views:
videos: videos:
remote: remote:
max_age: -1 max_age: -1
local_buffer_update_interval: '5 seconds'
ip_view_expiration: '1 second'

View File

@ -117,6 +117,7 @@ import { VideosRedundancyScheduler } from './server/lib/schedulers/videos-redund
import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler' import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler'
import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances' import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances'
import { RemoveDanglingResumableUploadsScheduler } from './server/lib/schedulers/remove-dangling-resumable-uploads-scheduler' import { RemoveDanglingResumableUploadsScheduler } from './server/lib/schedulers/remove-dangling-resumable-uploads-scheduler'
import { VideoViewsBufferScheduler } from './server/lib/schedulers/video-views-buffer-scheduler'
import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto' import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto'
import { PeerTubeSocket } from './server/lib/peertube-socket' import { PeerTubeSocket } from './server/lib/peertube-socket'
import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls' import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls'
@ -128,6 +129,7 @@ import { LiveManager } from './server/lib/live'
import { HttpStatusCode } from './shared/models/http/http-error-codes' import { HttpStatusCode } from './shared/models/http/http-error-codes'
import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache' import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache'
import { ServerConfigManager } from '@server/lib/server-config-manager' import { ServerConfigManager } from '@server/lib/server-config-manager'
import { VideoViews } from '@server/lib/video-views'
// ----------- Command line ----------- // ----------- Command line -----------
@ -296,11 +298,11 @@ async function startApplication () {
PeerTubeVersionCheckScheduler.Instance.enable() PeerTubeVersionCheckScheduler.Instance.enable()
AutoFollowIndexInstances.Instance.enable() AutoFollowIndexInstances.Instance.enable()
RemoveDanglingResumableUploadsScheduler.Instance.enable() RemoveDanglingResumableUploadsScheduler.Instance.enable()
VideoViewsBufferScheduler.Instance.enable()
// Redis initialization
Redis.Instance.init() Redis.Instance.init()
PeerTubeSocket.Instance.init(server) PeerTubeSocket.Instance.init(server)
VideoViews.Instance.init()
updateStreamingPlaylistsInfohashesIfNeeded() updateStreamingPlaylistsInfohashesIfNeeded()
.catch(err => logger.error('Cannot update streaming playlist infohashes.', { err })) .catch(err => logger.error('Cannot update streaming playlist infohashes.', { err }))

View File

@ -2,7 +2,7 @@ import express from 'express'
import toInt from 'validator/lib/toInt' import toInt from 'validator/lib/toInt'
import { pickCommonVideoQuery } from '@server/helpers/query' import { pickCommonVideoQuery } from '@server/helpers/query'
import { doJSONRequest } from '@server/helpers/requests' import { doJSONRequest } from '@server/helpers/requests'
import { LiveManager } from '@server/lib/live' import { VideoViews } from '@server/lib/video-views'
import { openapiOperationDoc } from '@server/middlewares/doc' import { openapiOperationDoc } from '@server/middlewares/doc'
import { getServerActor } from '@server/models/application/application' import { getServerActor } from '@server/models/application/application'
import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils'
@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
import { sendView } from '../../../lib/activitypub/send/send-view' import { sendView } from '../../../lib/activitypub/send/send-view'
import { JobQueue } from '../../../lib/job-queue' import { JobQueue } from '../../../lib/job-queue'
import { Hooks } from '../../../lib/plugins/hooks' import { Hooks } from '../../../lib/plugins/hooks'
import { Redis } from '../../../lib/redis'
import { import {
asyncMiddleware, asyncMiddleware,
asyncRetryTransactionMiddleware, asyncRetryTransactionMiddleware,
@ -107,7 +106,7 @@ videosRouter.get('/:id',
) )
videosRouter.post('/:id/views', videosRouter.post('/:id/views',
openapiOperationDoc({ operationId: 'addView' }), openapiOperationDoc({ operationId: 'addView' }),
asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')), asyncMiddleware(videosCustomGetValidator('only-video')),
asyncMiddleware(viewVideo) asyncMiddleware(viewVideo)
) )
@ -153,45 +152,18 @@ function getVideo (_req: express.Request, res: express.Response) {
} }
async function viewVideo (req: express.Request, res: express.Response) { async function viewVideo (req: express.Request, res: express.Response) {
const immutableVideoAttrs = res.locals.onlyImmutableVideo const video = res.locals.onlyVideo
const ip = req.ip const ip = req.ip
const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) const success = await VideoViews.Instance.processView({ video, ip })
if (exists) {
logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
return res.status(HttpStatusCode.NO_CONTENT_204).end()
}
const video = await VideoModel.load(immutableVideoAttrs.id) if (success) {
const promises: Promise<any>[] = [
Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
]
let federateView = true
// Increment our live manager
if (video.isLive && video.isOwned()) {
LiveManager.Instance.addViewTo(video.id)
// Views of our local live will be sent by our live manager
federateView = false
}
// Increment our video views cache counter
if (!video.isLive) {
promises.push(Redis.Instance.addVideoView(video.id))
}
if (federateView) {
const serverActor = await getServerActor() const serverActor = await getServerActor()
promises.push(sendView(serverActor, video, undefined)) await sendView(serverActor, video, undefined)
Hooks.runAction('action:api.video.viewed', { video: video, ip })
} }
await Promise.all(promises)
Hooks.runAction('action:api.video.viewed', { video, ip })
return res.status(HttpStatusCode.NO_CONTENT_204).end() return res.status(HttpStatusCode.NO_CONTENT_204).end()
} }

View File

@ -38,7 +38,7 @@ function checkMissedConfig () {
'services.twitter.username', 'services.twitter.whitelisted', 'services.twitter.username', 'services.twitter.whitelisted',
'followers.instance.enabled', 'followers.instance.manual_approval', 'followers.instance.enabled', 'followers.instance.manual_approval',
'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces',
'history.videos.max_age', 'views.videos.remote.max_age', 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration',
'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
'theme.default', 'theme.default',
'remote_redundancy.videos.accept_from', 'remote_redundancy.videos.accept_from',

View File

@ -182,7 +182,9 @@ const CONFIG = {
VIDEOS: { VIDEOS: {
REMOTE: { REMOTE: {
MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age'))
} },
LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')),
IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration'))
} }
}, },
PLUGINS: { PLUGINS: {

View File

@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
'video-import': 1, 'video-import': 1,
'email': 5, 'email': 5,
'actor-keys': 3, 'actor-keys': 3,
'videos-views': 1, 'videos-views-stats': 1,
'activitypub-refresher': 1, 'activitypub-refresher': 1,
'video-redundancy': 1, 'video-redundancy': 1,
'video-live-ending': 1, 'video-live-ending': 1,
@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
'video-file-import': 1, 'video-file-import': 1,
'email': 5, 'email': 5,
'actor-keys': 1, 'actor-keys': 1,
'videos-views': 1, 'videos-views-stats': 1,
'activitypub-refresher': 1, 'activitypub-refresher': 1,
'video-redundancy': 1, 'video-redundancy': 1,
'video-live-ending': 10, 'video-live-ending': 10,
@ -181,14 +181,14 @@ const JOB_TTL: { [id in JobType]: number } = {
'video-import': 1000 * 3600 * 2, // 2 hours 'video-import': 1000 * 3600 * 2, // 2 hours
'email': 60000 * 10, // 10 minutes 'email': 60000 * 10, // 10 minutes
'actor-keys': 60000 * 20, // 20 minutes 'actor-keys': 60000 * 20, // 20 minutes
'videos-views': undefined, // Unlimited 'videos-views-stats': undefined, // Unlimited
'activitypub-refresher': 60000 * 10, // 10 minutes 'activitypub-refresher': 60000 * 10, // 10 minutes
'video-redundancy': 1000 * 3600 * 3, // 3 hours 'video-redundancy': 1000 * 3600 * 3, // 3 hours
'video-live-ending': 1000 * 60 * 10, // 10 minutes 'video-live-ending': 1000 * 60 * 10, // 10 minutes
'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
} }
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
'videos-views': { 'videos-views-stats': {
cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
}, },
'activitypub-cleaner': { 'activitypub-cleaner': {
@ -211,6 +211,7 @@ const SCHEDULER_INTERVALS_MS = {
REMOVE_OLD_JOBS: 60000 * 60, // 1 hour REMOVE_OLD_JOBS: 60000 * 60, // 1 hour
UPDATE_VIDEOS: 60000, // 1 minute UPDATE_VIDEOS: 60000, // 1 minute
YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day
VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL,
CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day
AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day
@ -343,8 +344,8 @@ const CONSTRAINTS_FIELDS = {
} }
const VIEW_LIFETIME = { const VIEW_LIFETIME = {
VIDEO: 60000 * 60, // 1 hour VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
LIVE: 60000 * 5 // 5 minutes VIEWER: 60000 * 5 // 5 minutes
} }
let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
@ -789,13 +790,12 @@ if (isTestInstance() === true) {
SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
REPEAT_JOBS['videos-views'] = { every: 5000 } REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
VIEW_LIFETIME.VIDEO = 1000 // 1 second VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second
VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
CONTACT_FORM_LIFETIME = 1000 // 1 second CONTACT_FORM_LIFETIME = 1000 // 1 second
JOB_ATTEMPTS['email'] = 1 JOB_ATTEMPTS['email'] = 1

View File

@ -1,13 +1,13 @@
import { getOrCreateAPVideo } from '../videos' import { VideoViews } from '@server/lib/video-views'
import { forwardVideoRelatedActivity } from '../send/utils' import { ActivityView } from '../../../../shared/models/activitypub'
import { Redis } from '../../redis'
import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { APProcessorOptions } from '../../../types/activitypub-processor.model'
import { MActorSignature } from '../../../types/models' import { MActorSignature } from '../../../types/models'
import { LiveManager } from '@server/lib/live/live-manager' import { forwardVideoRelatedActivity } from '../send/utils'
import { getOrCreateAPVideo } from '../videos'
async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { async function processViewActivity (options: APProcessorOptions<ActivityView>) {
const { activity, byActor } = options const { activity, byActor } = options
return processCreateView(activity, byActor) return processCreateView(activity, byActor)
} }
@ -19,10 +19,8 @@ export {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
const videoObject = activity.type === 'View' const videoObject = activity.object
? activity.object
: (activity.object as ViewObject).object
const { video } = await getOrCreateAPVideo({ const { video } = await getOrCreateAPVideo({
videoObject, videoObject,
@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
allowRefresh: false allowRefresh: false
}) })
if (!video.isLive) { const viewerExpires = activity.expires
await Redis.Instance.addVideoView(video.id) ? new Date(activity.expires)
} : undefined
await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
if (video.isOwned()) { if (video.isOwned()) {
// Our live manager will increment the counter and send the view to followers
if (video.isLive) {
LiveManager.Instance.addViewTo(video.id)
return
}
// Forward the view but don't resend the activity to the sender // Forward the view but don't resend the activity to the sender
const exceptions = [ byActor ] const exceptions = [ byActor ]
await forwardVideoRelatedActivity(activity, undefined, exceptions, video) await forwardVideoRelatedActivity(activity, undefined, exceptions, video)

View File

@ -1,4 +1,5 @@
import { Transaction } from 'sequelize' import { Transaction } from 'sequelize'
import { VideoViews } from '@server/lib/video-views'
import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU
id: url, id: url,
type: 'View' as 'View', type: 'View' as 'View',
actor: byActor.url, actor: byActor.url,
object: video.url object: video.url,
expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
}, },
audience audience
) )

View File

@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
if (videoUpdated.isLive) { if (videoUpdated.isLive) {
PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
} }
logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())

View File

@ -1,11 +1,10 @@
import { Redis } from '../../redis' import { isTestInstance } from '../../../helpers/core-utils'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { VideoModel } from '../../../models/video/video' import { VideoModel } from '../../../models/video/video'
import { VideoViewModel } from '../../../models/video/video-view' import { VideoViewModel } from '../../../models/video/video-view'
import { isTestInstance } from '../../../helpers/core-utils' import { Redis } from '../../redis'
import { federateVideoIfNeeded } from '../../activitypub/videos'
async function processVideosViews () { async function processVideosViewsStats () {
const lastHour = new Date() const lastHour = new Date()
// In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@ -15,23 +14,23 @@ async function processVideosViews () {
const startDate = lastHour.setMinutes(0, 0, 0) const startDate = lastHour.setMinutes(0, 0, 0)
const endDate = lastHour.setMinutes(59, 59, 999) const endDate = lastHour.setMinutes(59, 59, 999)
const videoIds = await Redis.Instance.getVideosIdViewed(hour) const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
if (videoIds.length === 0) return if (videoIds.length === 0) return
logger.info('Processing videos views in job for hour %d.', hour) logger.info('Processing videos views stats in job for hour %d.', hour)
for (const videoId of videoIds) { for (const videoId of videoIds) {
try { try {
const views = await Redis.Instance.getVideoViews(videoId, hour) const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
await Redis.Instance.deleteVideoViews(videoId, hour) await Redis.Instance.deleteVideoViewsStats(videoId, hour)
if (views) { if (views) {
logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
try { try {
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) const video = await VideoModel.load(videoId)
if (!video) { if (!video) {
logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
continue continue
} }
@ -41,21 +40,12 @@ async function processVideosViews () {
views, views,
videoId videoId
}) })
if (video.isOwned()) {
// If this is a remote video, the origin instance will send us an update
await VideoModel.incrementViews(videoId, views)
// Send video update
video.views += views
await federateVideoIfNeeded(video, false)
}
} catch (err) { } catch (err) {
logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
} }
} }
} catch (err) { } catch (err) {
logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
} }
} }
} }
@ -63,5 +53,5 @@ async function processVideosViews () {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export { export {
processVideosViews processVideosViewsStats
} }

View File

@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import' import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViews } from './handlers/video-views' import { processVideosViewsStats } from './handlers/video-views-stats'
type CreateJobArgument = type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@ -49,7 +49,7 @@ type CreateJobArgument =
{ type: 'email', payload: EmailPayload } | { type: 'email', payload: EmailPayload } |
{ type: 'video-import', payload: VideoImportPayload } | { type: 'video-import', payload: VideoImportPayload } |
{ type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } |
{ type: 'videos-views', payload: {} } | { type: 'videos-views-stats', payload: {} } |
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'actor-keys', payload: ActorKeysPayload } | { type: 'actor-keys', payload: ActorKeysPayload } |
{ type: 'video-redundancy', payload: VideoRedundancyPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } |
@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
'video-transcoding': processVideoTranscoding, 'video-transcoding': processVideoTranscoding,
'email': processEmail, 'email': processEmail,
'video-import': processVideoImport, 'video-import': processVideoImport,
'videos-views': processVideosViews, 'videos-views-stats': processVideosViewsStats,
'activitypub-refresher': refreshAPObject, 'activitypub-refresher': refreshAPObject,
'video-live-ending': processVideoLiveEnding, 'video-live-ending': processVideoLiveEnding,
'actor-keys': processActorKeys, 'actor-keys': processActorKeys,
@ -89,7 +89,7 @@ const jobTypes: JobType[] = [
'video-transcoding', 'video-transcoding',
'video-file-import', 'video-file-import',
'video-import', 'video-import',
'videos-views', 'videos-views-stats',
'activitypub-refresher', 'activitypub-refresher',
'video-redundancy', 'video-redundancy',
'actor-keys', 'actor-keys',
@ -247,8 +247,8 @@ class JobQueue {
} }
private addRepeatableJobs () { private addRepeatableJobs () {
this.queues['videos-views'].add({}, { this.queues['videos-views-stats'].add({}, {
repeat: REPEAT_JOBS['videos-views'] repeat: REPEAT_JOBS['videos-views-stats']
}).catch(err => logger.error('Cannot add repeatable job.', { err })) }).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {

View File

@ -2,7 +2,6 @@
import { readFile } from 'fs-extra' import { readFile } from 'fs-extra'
import { createServer, Server } from 'net' import { createServer, Server } from 'net'
import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
import { isTestInstance } from '@server/helpers/core-utils'
import { import {
computeResolutionsToTranscode, computeResolutionsToTranscode,
ffprobePromise, ffprobePromise,
@ -12,7 +11,7 @@ import {
} from '@server/helpers/ffprobe-utils' } from '@server/helpers/ffprobe-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger' import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user' import { UserModel } from '@server/models/user/user'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveModel } from '@server/models/video/video-live'
@ -53,8 +52,6 @@ class LiveManager {
private readonly muxingSessions = new Map<string, MuxingSession>() private readonly muxingSessions = new Map<string, MuxingSession>()
private readonly videoSessions = new Map<number, string>() private readonly videoSessions = new Map<number, string>()
// Values are Date().getTime()
private readonly watchersPerVideo = new Map<number, number[]>()
private rtmpServer: Server private rtmpServer: Server
private rtmpsServer: ServerTLS private rtmpsServer: ServerTLS
@ -99,8 +96,6 @@ class LiveManager {
// Cleanup broken lives, that were terminated by a server restart for example // Cleanup broken lives, that were terminated by a server restart for example
this.handleBrokenLives() this.handleBrokenLives()
.catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
} }
async run () { async run () {
@ -184,19 +179,6 @@ class LiveManager {
this.abortSession(sessionId) this.abortSession(sessionId)
} }
addViewTo (videoId: number) {
if (this.videoSessions.has(videoId) === false) return
let watchers = this.watchersPerVideo.get(videoId)
if (!watchers) {
watchers = []
this.watchersPerVideo.set(videoId, watchers)
}
watchers.push(new Date().getTime())
}
private getContext () { private getContext () {
return context return context
} }
@ -377,7 +359,6 @@ class LiveManager {
} }
private onMuxingFFmpegEnd (videoId: number) { private onMuxingFFmpegEnd (videoId: number) {
this.watchersPerVideo.delete(videoId)
this.videoSessions.delete(videoId) this.videoSessions.delete(videoId)
} }
@ -411,34 +392,6 @@ class LiveManager {
} }
} }
private async updateLiveViews () {
if (!this.isRunning()) return
if (!isTestInstance()) logger.info('Updating live video views.', lTags())
for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
const watchers = this.watchersPerVideo.get(videoId)
const numWatchers = watchers.length
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
video.views = numWatchers
await video.save()
await federateVideoIfNeeded(video, false)
PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
// Only keep not expired watchers
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)
logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
}
}
private async handleBrokenLives () { private async handleBrokenLives () {
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()

View File

@ -1,7 +1,7 @@
import { Server as HTTPServer } from 'http' import { Server as HTTPServer } from 'http'
import { Namespace, Server as SocketServer, Socket } from 'socket.io' import { Namespace, Server as SocketServer, Socket } from 'socket.io'
import { isIdValid } from '@server/helpers/custom-validators/misc' import { isIdValid } from '@server/helpers/custom-validators/misc'
import { MVideo } from '@server/types/models' import { MVideo, MVideoImmutable } from '@server/types/models'
import { UserNotificationModelForApi } from '@server/types/models/user' import { UserNotificationModelForApi } from '@server/types/models/user'
import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
import { logger } from '../helpers/logger' import { logger } from '../helpers/logger'
@ -78,11 +78,11 @@ class PeerTubeSocket {
.emit(type, data) .emit(type, data)
} }
sendVideoViewsUpdate (video: MVideo) { sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
const data: LiveVideoEventPayload = { views: video.views } const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
const type: LiveVideoEventType = 'views-change' const type: LiveVideoEventType = 'views-change'
logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
this.liveVideosNamespace this.liveVideosNamespace
.in(video.id) .in(video.id)

View File

@ -13,6 +13,7 @@ import {
RESUMABLE_UPLOAD_SESSION_LIFETIME RESUMABLE_UPLOAD_SESSION_LIFETIME
} from '../initializers/constants' } from '../initializers/constants'
import { CONFIG } from '../initializers/config' import { CONFIG } from '../initializers/config'
import { exists } from '@server/helpers/custom-validators/misc'
type CachedRoute = { type CachedRoute = {
body: string body: string
@ -119,16 +120,20 @@ class Redis {
/* ************ Views per IP ************ */ /* ************ Views per IP ************ */
setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { setIPVideoView (ip: string, videoUUID: string) {
const lifetime = isLive return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
? VIEW_LIFETIME.LIVE }
: VIEW_LIFETIME.VIDEO
return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) setIPVideoViewer (ip: string, videoUUID: string) {
return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
} }
async doesVideoIPViewExist (ip: string, videoUUID: string) { async doesVideoIPViewExist (ip: string, videoUUID: string) {
return this.exists(this.generateViewKey(ip, videoUUID)) return this.exists(this.generateIPViewKey(ip, videoUUID))
}
async doesVideoIPViewerExist (ip: string, videoUUID: string) {
return this.exists(this.generateIPViewerKey(ip, videoUUID))
} }
/* ************ Tracker IP block ************ */ /* ************ Tracker IP block ************ */
@ -160,46 +165,85 @@ class Redis {
return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
} }
/* ************ Video views ************ */ /* ************ Video views stats ************ */
addVideoView (videoId: number) { addVideoViewStats (videoId: number) {
const keyIncr = this.generateVideoViewKey(videoId) const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
const keySet = this.generateVideosViewKey()
return Promise.all([ return Promise.all([
this.addToSet(keySet, videoId.toString()), this.addToSet(setKey, videoId.toString()),
this.increment(keyIncr) this.increment(videoKey)
]) ])
} }
async getVideoViews (videoId: number, hour: number) { async getVideoViewsStats (videoId: number, hour: number) {
const key = this.generateVideoViewKey(videoId, hour) const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
const valueString = await this.getValue(key) const valueString = await this.getValue(videoKey)
const valueInt = parseInt(valueString, 10) const valueInt = parseInt(valueString, 10)
if (isNaN(valueInt)) { if (isNaN(valueInt)) {
logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
return undefined return undefined
} }
return valueInt return valueInt
} }
async getVideosIdViewed (hour: number) { async listVideosViewedForStats (hour: number) {
const key = this.generateVideosViewKey(hour) const { setKey } = this.generateVideoViewStatsKeys({ hour })
const stringIds = await this.getSet(key) const stringIds = await this.getSet(setKey)
return stringIds.map(s => parseInt(s, 10)) return stringIds.map(s => parseInt(s, 10))
} }
deleteVideoViews (videoId: number, hour: number) { deleteVideoViewsStats (videoId: number, hour: number) {
const keySet = this.generateVideosViewKey(hour) const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
const keyIncr = this.generateVideoViewKey(videoId, hour)
return Promise.all([ return Promise.all([
this.deleteFromSet(keySet, videoId.toString()), this.deleteFromSet(setKey, videoId.toString()),
this.deleteKey(keyIncr) this.deleteKey(videoKey)
])
}
/* ************ Local video views buffer ************ */
addLocalVideoView (videoId: number) {
const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
return Promise.all([
this.addToSet(setKey, videoId.toString()),
this.increment(videoKey)
])
}
async getLocalVideoViews (videoId: number) {
const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
const valueString = await this.getValue(videoKey)
const valueInt = parseInt(valueString, 10)
if (isNaN(valueInt)) {
logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
return undefined
}
return valueInt
}
async listLocalVideosViewed () {
const { setKey } = this.generateLocalVideoViewsKeys()
const stringIds = await this.getSet(setKey)
return stringIds.map(s => parseInt(s, 10))
}
deleteLocalVideoViews (videoId: number) {
const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
return Promise.all([
this.deleteFromSet(setKey, videoId.toString()),
this.deleteKey(videoKey)
]) ])
} }
@ -233,16 +277,16 @@ class Redis {
return req.method + '-' + req.originalUrl return req.method + '-' + req.originalUrl
} }
private generateVideosViewKey (hour?: number) { private generateLocalVideoViewsKeys (videoId?: Number) {
if (!hour) hour = new Date().getHours() return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
return `videos-view-h${hour}`
} }
private generateVideoViewKey (videoId: number, hour?: number) { private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
if (hour === undefined || hour === null) hour = new Date().getHours() const hour = exists(options.hour)
? options.hour
: new Date().getHours()
return `video-view-${videoId}-h${hour}` return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
} }
private generateResetPasswordKey (userId: number) { private generateResetPasswordKey (userId: number) {
@ -253,10 +297,14 @@ class Redis {
return 'verify-email-' + userId return 'verify-email-' + userId
} }
private generateViewKey (ip: string, videoUUID: string) { private generateIPViewKey (ip: string, videoUUID: string) {
return `views-${videoUUID}-${ip}` return `views-${videoUUID}-${ip}`
} }
private generateIPViewerKey (ip: string, videoUUID: string) {
return `viewer-${videoUUID}-${ip}`
}
private generateTrackerBlockIPKey (ip: string) { private generateTrackerBlockIPKey (ip: string) {
return `tracker-block-ip-${ip}` return `tracker-block-ip-${ip}`
} }

View File

@ -0,0 +1,52 @@
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { VideoModel } from '@server/models/video/video'
import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { federateVideoIfNeeded } from '../activitypub/videos'
import { Redis } from '../redis'
import { AbstractScheduler } from './abstract-scheduler'
const lTags = loggerTagsFactory('views')
export class VideoViewsBufferScheduler extends AbstractScheduler {
private static instance: AbstractScheduler
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
private constructor () {
super()
}
protected async internalExecute () {
const videoIds = await Redis.Instance.listLocalVideosViewed()
if (videoIds.length === 0) return
logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
for (const videoId of videoIds) {
try {
const views = await Redis.Instance.getLocalVideoViews(videoId)
await Redis.Instance.deleteLocalVideoViews(videoId)
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
if (!video) {
logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
continue
}
// If this is a remote video, the origin instance will send us an update
await VideoModel.incrementViews(videoId, views)
// Send video update
video.views += views
await federateVideoIfNeeded(video, false)
} catch (err) {
logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
}
}
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}

130
server/lib/video-views.ts Normal file
View File

@ -0,0 +1,130 @@
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { VIEW_LIFETIME } from '@server/initializers/constants'
import { VideoModel } from '@server/models/video/video'
import { MVideo } from '@server/types/models'
import { PeerTubeSocket } from './peertube-socket'
import { Redis } from './redis'
const lTags = loggerTagsFactory('views')
export class VideoViews {
// Values are Date().getTime()
private readonly viewersPerVideo = new Map<number, number[]>()
private static instance: VideoViews
private constructor () {
}
init () {
setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
}
async processView (options: {
video: MVideo
ip: string | null
viewerExpires?: Date
}) {
const { video, ip, viewerExpires } = options
logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
let success = await this.addView(video, ip)
if (video.isLive) {
const successViewer = await this.addViewer(video, ip, viewerExpires)
success ||= successViewer
}
return success
}
getViewers (video: MVideo) {
const viewers = this.viewersPerVideo.get(video.id)
if (!viewers) return 0
return viewers.length
}
buildViewerExpireTime () {
return new Date().getTime() + VIEW_LIFETIME.VIEWER
}
private async addView (video: MVideo, ip: string | null) {
const promises: Promise<any>[] = []
if (ip !== null) {
const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
if (viewExists) return false
promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
}
if (video.isOwned()) {
promises.push(Redis.Instance.addLocalVideoView(video.id))
}
promises.push(Redis.Instance.addVideoViewStats(video.id))
await Promise.all(promises)
return true
}
private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
if (ip !== null) {
const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
if (viewExists) return false
await Redis.Instance.setIPVideoViewer(ip, video.uuid)
}
let watchers = this.viewersPerVideo.get(video.id)
if (!watchers) {
watchers = []
this.viewersPerVideo.set(video.id, watchers)
}
const expiration = viewerExpires
? viewerExpires.getTime()
: this.buildViewerExpireTime()
watchers.push(expiration)
await this.notifyClients(video.id, watchers.length)
return true
}
private async cleanViewers () {
logger.info('Cleaning video viewers.', lTags())
for (const videoId of this.viewersPerVideo.keys()) {
const notBefore = new Date().getTime()
const viewers = this.viewersPerVideo.get(videoId)
// Only keep not expired viewers
const newViewers = viewers.filter(w => w > notBefore)
if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
else this.viewersPerVideo.set(videoId, newViewers)
await this.notifyClients(videoId, newViewers.length)
}
}
private async notifyClients (videoId: string | number, viewersLength: number) {
const video = await VideoModel.loadImmutableAttributes(videoId)
if (!video) return
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}

View File

@ -1,6 +1,7 @@
import { uuidToShort } from '@server/helpers/uuid' import { uuidToShort } from '@server/helpers/uuid'
import { generateMagnetUri } from '@server/helpers/webtorrent' import { generateMagnetUri } from '@server/helpers/webtorrent'
import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls'
import { VideoViews } from '@server/lib/video-views'
import { VideosCommonQueryAfterSanitize } from '@shared/models' import { VideosCommonQueryAfterSanitize } from '@shared/models'
import { VideoFile } from '@shared/models/videos/video-file.model' import { VideoFile } from '@shared/models/videos/video-file.model'
import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects'
@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm
pluginData: (video as any).pluginData pluginData: (video as any).pluginData
} }
if (video.isLive) {
videoObject.viewers = VideoViews.Instance.getViewers(video)
}
const add = options.additionalAttributes const add = options.additionalAttributes
if (add?.state === true) { if (add?.state === true) {
videoObject.state = { videoObject.state = {

View File

@ -19,7 +19,7 @@ import {
const expect = chai.expect const expect = chai.expect
describe('Test live', function () { describe('Live views', function () {
let servers: PeerTubeServer[] = [] let servers: PeerTubeServer[] = []
before(async function () { before(async function () {
@ -47,79 +47,86 @@ describe('Test live', function () {
await doubleFollow(servers[0], servers[1]) await doubleFollow(servers[0], servers[1])
}) })
describe('Live views', function () { let liveVideoId: string
let liveVideoId: string let command: FfmpegCommand
let command: FfmpegCommand
async function countViews (expected: number) { async function countViewers (expectedViewers: number) {
for (const server of servers) { for (const server of servers) {
const video = await server.videos.get({ id: liveVideoId }) const video = await server.videos.get({ id: liveVideoId })
expect(video.views).to.equal(expected) expect(video.viewers).to.equal(expectedViewers)
} }
}
async function countViews (expectedViews: number) {
for (const server of servers) {
const video = await server.videos.get({ id: liveVideoId })
expect(video.views).to.equal(expectedViews)
}
}
before(async function () {
this.timeout(30000)
const liveAttributes = {
name: 'live video',
channelId: servers[0].store.channel.id,
privacy: VideoPrivacy.PUBLIC
} }
before(async function () { const live = await servers[0].live.create({ fields: liveAttributes })
this.timeout(30000) liveVideoId = live.uuid
const liveAttributes = { command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
name: 'live video', await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
channelId: servers[0].store.channel.id, await waitJobs(servers)
privacy: VideoPrivacy.PUBLIC })
}
const live = await servers[0].live.create({ fields: liveAttributes }) it('Should display no views and viewers for a live', async function () {
liveVideoId = live.uuid await countViews(0)
await countViewers(0)
})
command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) it('Should view a live twice and display 1 view/viewer', async function () {
await waitUntilLivePublishedOnAllServers(servers, liveVideoId) this.timeout(30000)
await waitJobs(servers)
})
it('Should display no views for a live', async function () { await servers[0].videos.view({ id: liveVideoId })
await countViews(0) await servers[0].videos.view({ id: liveVideoId })
})
it('Should view a live twice and display 1 view', async function () { await waitJobs(servers)
this.timeout(30000) await countViewers(1)
await servers[0].videos.view({ id: liveVideoId }) await wait(7000)
await servers[0].videos.view({ id: liveVideoId }) await countViews(1)
})
await wait(7000) it('Should wait and display 0 viewers while still have 1 view', async function () {
this.timeout(30000)
await waitJobs(servers) await wait(12000)
await waitJobs(servers)
await countViews(1) await countViews(1)
}) await countViewers(0)
})
it('Should wait and display 0 views', async function () { it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () {
this.timeout(30000) this.timeout(30000)
await wait(12000) await servers[0].videos.view({ id: liveVideoId })
await waitJobs(servers) await servers[1].videos.view({ id: liveVideoId })
await servers[1].videos.view({ id: liveVideoId })
await waitJobs(servers)
await countViews(0) await countViewers(2)
})
it('Should view a live on a remote and on local and display 2 views', async function () { await wait(7000)
this.timeout(30000) await waitJobs(servers)
await servers[0].videos.view({ id: liveVideoId }) await countViews(3)
await servers[1].videos.view({ id: liveVideoId })
await servers[1].videos.view({ id: liveVideoId })
await wait(7000)
await waitJobs(servers)
await countViews(2)
})
after(async function () {
await stopFfmpeg(command)
})
}) })
after(async function () { after(async function () {
await stopFfmpeg(command)
await cleanupTests(servers) await cleanupTests(servers)
}) })
}) })

View File

@ -56,7 +56,7 @@ describe('Test jobs', function () {
let job = body.data[0] let job = body.data[0]
// Skip repeat jobs // Skip repeat jobs
if (job.type === 'videos-views') job = body.data[1] if (job.type === 'videos-views-stats') job = body.data[1]
expect(job.state).to.equal('completed') expect(job.state).to.equal('completed')
expect(job.type.startsWith('activitypub-')).to.be.true expect(job.type.startsWith('activitypub-')).to.be.true

View File

@ -1,5 +1,5 @@
import { JobState } from '../../models' import { JobState, JobType } from '../../models'
import { wait } from '../miscs' import { wait } from '../miscs'
import { PeerTubeServer } from './server' import { PeerTubeServer } from './server'
@ -16,7 +16,7 @@ async function waitJobs (serversArg: PeerTubeServer[] | PeerTubeServer, skipDela
const states: JobState[] = [ 'waiting', 'active' ] const states: JobState[] = [ 'waiting', 'active' ]
if (!skipDelayed) states.push('delayed') if (!skipDelayed) states.push('delayed')
const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] const repeatableJobs: JobType[] = [ 'videos-views-stats', 'activitypub-cleaner' ]
let pendingRequests: boolean let pendingRequests: boolean
function tasksBuilder () { function tasksBuilder () {

View File

@ -6,7 +6,6 @@ import { DislikeObject } from './objects/dislike-object'
import { APObject } from './objects/object.model' import { APObject } from './objects/object.model'
import { PlaylistObject } from './objects/playlist-object' import { PlaylistObject } from './objects/playlist-object'
import { VideoCommentObject } from './objects/video-comment-object' import { VideoCommentObject } from './objects/video-comment-object'
import { ViewObject } from './objects/view-object'
export type Activity = export type Activity =
ActivityCreate | ActivityCreate |
@ -53,7 +52,7 @@ export interface BaseActivity {
export interface ActivityCreate extends BaseActivity { export interface ActivityCreate extends BaseActivity {
type: 'Create' type: 'Create'
object: VideoObject | AbuseObject | ViewObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject object: VideoObject | AbuseObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject
} }
export interface ActivityUpdate extends BaseActivity { export interface ActivityUpdate extends BaseActivity {
@ -100,6 +99,7 @@ export interface ActivityView extends BaseActivity {
type: 'View' type: 'View'
actor: string actor: string
object: APObject object: APObject
expires: string
} }
export interface ActivityDislike extends BaseActivity { export interface ActivityDislike extends BaseActivity {

View File

@ -14,7 +14,7 @@ export type JobType =
| 'video-transcoding' | 'video-transcoding'
| 'email' | 'email'
| 'video-import' | 'video-import'
| 'videos-views' | 'videos-views-stats'
| 'activitypub-refresher' | 'activitypub-refresher'
| 'video-redundancy' | 'video-redundancy'
| 'video-live-ending' | 'video-live-ending'

View File

@ -2,5 +2,9 @@ import { VideoState } from '../video-state.enum'
export interface LiveVideoEventPayload { export interface LiveVideoEventPayload {
state?: VideoState state?: VideoState
// FIXME: deprecated in 4.0 in favour of viewers
views?: number views?: number
viewers?: number
} }

View File

@ -39,6 +39,9 @@ export interface Video {
url: string url: string
views: number views: number
// If live
viewers?: number
likes: number likes: number
dislikes: number dislikes: number
nsfw: boolean nsfw: boolean

View File

@ -4892,7 +4892,7 @@ components:
- video-transcoding - video-transcoding
- video-file-import - video-file-import
- video-import - video-import
- videos-views - videos-views-stats
- activitypub-refresher - activitypub-refresher
- video-redundancy - video-redundancy
- video-live-ending - video-live-ending
@ -5397,6 +5397,9 @@ components:
- $ref: '#/components/schemas/Video' - $ref: '#/components/schemas/Video'
- type: object - type: object
properties: properties:
viewers:
type: integer
description: If the video is a live, you have the amount of current viewers
descriptionPath: descriptionPath:
type: string type: string
example: /api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/description example: /api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/description
@ -6300,7 +6303,7 @@ components:
- video-transcoding - video-transcoding
- email - email
- video-import - video-import
- videos-views - videos-views-stats
- activitypub-refresher - activitypub-refresher
- video-redundancy - video-redundancy
data: data: