Handle views for live videos
This commit is contained in:
parent
529f037294
commit
e4bf785617
|
@ -66,6 +66,7 @@ import { liveRouter } from './live'
|
||||||
import { ownershipVideoRouter } from './ownership'
|
import { ownershipVideoRouter } from './ownership'
|
||||||
import { rateVideoRouter } from './rate'
|
import { rateVideoRouter } from './rate'
|
||||||
import { watchingRouter } from './watching'
|
import { watchingRouter } from './watching'
|
||||||
|
import { LiveManager } from '@server/lib/live-manager'
|
||||||
|
|
||||||
const auditLogger = auditLoggerFactory('videos')
|
const auditLogger = auditLoggerFactory('videos')
|
||||||
const videosRouter = express.Router()
|
const videosRouter = express.Router()
|
||||||
|
@ -416,26 +417,46 @@ async function getVideo (req: express.Request, res: express.Response) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async function viewVideo (req: express.Request, res: express.Response) {
|
async function viewVideo (req: express.Request, res: express.Response) {
|
||||||
const videoInstance = res.locals.onlyImmutableVideo
|
const immutableVideoAttrs = res.locals.onlyImmutableVideo
|
||||||
|
|
||||||
const ip = req.ip
|
const ip = req.ip
|
||||||
const exists = await Redis.Instance.doesVideoIPViewExist(ip, videoInstance.uuid)
|
const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid)
|
||||||
if (exists) {
|
if (exists) {
|
||||||
logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid)
|
logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
|
||||||
return res.status(204).end()
|
return res.sendStatus(204)
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.all([
|
const video = await VideoModel.load(immutableVideoAttrs.id)
|
||||||
Redis.Instance.addVideoView(videoInstance.id),
|
|
||||||
Redis.Instance.setIPVideoView(ip, videoInstance.uuid)
|
|
||||||
])
|
|
||||||
|
|
||||||
|
const promises: Promise<any>[] = [
|
||||||
|
Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
|
||||||
|
]
|
||||||
|
|
||||||
|
let federateView = true
|
||||||
|
|
||||||
|
// Increment our live manager
|
||||||
|
if (video.isLive && video.isOwned()) {
|
||||||
|
LiveManager.Instance.addViewTo(video.id)
|
||||||
|
|
||||||
|
// Views of our local live will be sent by our live manager
|
||||||
|
federateView = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment our video views cache counter
|
||||||
|
if (!video.isLive) {
|
||||||
|
promises.push(Redis.Instance.addVideoView(video.id))
|
||||||
|
}
|
||||||
|
|
||||||
|
if (federateView) {
|
||||||
const serverActor = await getServerActor()
|
const serverActor = await getServerActor()
|
||||||
await sendView(serverActor, videoInstance, undefined)
|
promises.push(sendView(serverActor, video, undefined))
|
||||||
|
}
|
||||||
|
|
||||||
Hooks.runAction('action:api.video.viewed', { video: videoInstance, ip })
|
await Promise.all(promises)
|
||||||
|
|
||||||
return res.status(204).end()
|
Hooks.runAction('action:api.video.viewed', { video, ip })
|
||||||
|
|
||||||
|
return res.sendStatus(204)
|
||||||
}
|
}
|
||||||
|
|
||||||
async function getVideoDescription (req: express.Request, res: express.Response) {
|
async function getVideoDescription (req: express.Request, res: express.Response) {
|
||||||
|
|
|
@ -316,7 +316,11 @@ const CONSTRAINTS_FIELDS = {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let VIDEO_VIEW_LIFETIME = 60000 * 60 // 1 hour
|
let VIEW_LIFETIME = {
|
||||||
|
VIDEO: 60000 * 60, // 1 hour
|
||||||
|
LIVE: 60000 * 5 // 5 minutes
|
||||||
|
}
|
||||||
|
|
||||||
let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
|
let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
|
||||||
|
|
||||||
const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = {
|
const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = {
|
||||||
|
@ -726,7 +730,8 @@ if (isTestInstance() === true) {
|
||||||
|
|
||||||
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
|
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
|
||||||
|
|
||||||
VIDEO_VIEW_LIFETIME = 1000 // 1 second
|
VIEW_LIFETIME.VIDEO = 1000 // 1 second
|
||||||
|
VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
|
||||||
CONTACT_FORM_LIFETIME = 1000 // 1 second
|
CONTACT_FORM_LIFETIME = 1000 // 1 second
|
||||||
|
|
||||||
JOB_ATTEMPTS['email'] = 1
|
JOB_ATTEMPTS['email'] = 1
|
||||||
|
@ -838,7 +843,7 @@ export {
|
||||||
JOB_COMPLETED_LIFETIME,
|
JOB_COMPLETED_LIFETIME,
|
||||||
HTTP_SIGNATURE,
|
HTTP_SIGNATURE,
|
||||||
VIDEO_IMPORT_STATES,
|
VIDEO_IMPORT_STATES,
|
||||||
VIDEO_VIEW_LIFETIME,
|
VIEW_LIFETIME,
|
||||||
CONTACT_FORM_LIFETIME,
|
CONTACT_FORM_LIFETIME,
|
||||||
VIDEO_PLAYLIST_PRIVACIES,
|
VIDEO_PLAYLIST_PRIVACIES,
|
||||||
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME,
|
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME,
|
||||||
|
|
|
@ -4,6 +4,7 @@ import { Redis } from '../../redis'
|
||||||
import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
|
import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
|
||||||
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
|
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
|
||||||
import { MActorSignature } from '../../../types/models'
|
import { MActorSignature } from '../../../types/models'
|
||||||
|
import { LiveManager } from '@server/lib/live-manager'
|
||||||
|
|
||||||
async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
|
async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
|
||||||
const { activity, byActor } = options
|
const { activity, byActor } = options
|
||||||
|
@ -19,19 +20,27 @@ export {
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) {
|
async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) {
|
||||||
const videoObject = activity.type === 'View' ? activity.object : (activity.object as ViewObject).object
|
const videoObject = activity.type === 'View'
|
||||||
|
? activity.object
|
||||||
|
: (activity.object as ViewObject).object
|
||||||
|
|
||||||
const options = {
|
const options = {
|
||||||
videoObject,
|
videoObject,
|
||||||
fetchType: 'only-immutable-attributes' as 'only-immutable-attributes',
|
fetchType: 'only-video' as 'only-video',
|
||||||
allowRefresh: false as false
|
allowRefresh: false as false
|
||||||
}
|
}
|
||||||
const { video } = await getOrCreateVideoAndAccountAndChannel(options)
|
const { video } = await getOrCreateVideoAndAccountAndChannel(options)
|
||||||
|
|
||||||
|
if (video.isOwned()) {
|
||||||
|
// Our live manager will increment the counter and send the view to followers
|
||||||
|
if (video.isLive) {
|
||||||
|
LiveManager.Instance.addViewTo(video.id)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
await Redis.Instance.addVideoView(video.id)
|
await Redis.Instance.addVideoView(video.id)
|
||||||
|
|
||||||
if (video.isOwned()) {
|
// Forward the view but don't resend the activity to the sender
|
||||||
// Don't resend the activity to the sender
|
|
||||||
const exceptions = [ byActor ]
|
const exceptions = [ byActor ]
|
||||||
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
|
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,8 @@ async function saveLive (video: MVideo, live: MVideoLive) {
|
||||||
await live.destroy()
|
await live.destroy()
|
||||||
|
|
||||||
video.isLive = false
|
video.isLive = false
|
||||||
|
// Reinit views
|
||||||
|
video.views = 0
|
||||||
video.state = VideoState.TO_TRANSCODE
|
video.state = VideoState.TO_TRANSCODE
|
||||||
video.duration = duration
|
video.duration = duration
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ import {
|
||||||
} from '@server/helpers/ffmpeg-utils'
|
} from '@server/helpers/ffmpeg-utils'
|
||||||
import { logger } from '@server/helpers/logger'
|
import { logger } from '@server/helpers/logger'
|
||||||
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
|
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
|
||||||
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
|
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
|
||||||
import { UserModel } from '@server/models/account/user'
|
import { UserModel } from '@server/models/account/user'
|
||||||
import { VideoModel } from '@server/models/video/video'
|
import { VideoModel } from '@server/models/video/video'
|
||||||
import { VideoFileModel } from '@server/models/video/video-file'
|
import { VideoFileModel } from '@server/models/video/video-file'
|
||||||
|
@ -61,6 +61,8 @@ class LiveManager {
|
||||||
|
|
||||||
private readonly transSessions = new Map<string, FfmpegCommand>()
|
private readonly transSessions = new Map<string, FfmpegCommand>()
|
||||||
private readonly videoSessions = new Map<number, string>()
|
private readonly videoSessions = new Map<number, string>()
|
||||||
|
// Values are Date().getTime()
|
||||||
|
private readonly watchersPerVideo = new Map<number, number[]>()
|
||||||
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
|
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
|
||||||
private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
|
private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
|
||||||
|
|
||||||
|
@ -115,6 +117,8 @@ class LiveManager {
|
||||||
this.stop()
|
this.stop()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
|
||||||
}
|
}
|
||||||
|
|
||||||
run () {
|
run () {
|
||||||
|
@ -131,6 +135,10 @@ class LiveManager {
|
||||||
this.rtmpServer = undefined
|
this.rtmpServer = undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isRunning () {
|
||||||
|
return !!this.rtmpServer
|
||||||
|
}
|
||||||
|
|
||||||
getSegmentsSha256 (videoUUID: string) {
|
getSegmentsSha256 (videoUUID: string) {
|
||||||
return this.segmentsSha256.get(videoUUID)
|
return this.segmentsSha256.get(videoUUID)
|
||||||
}
|
}
|
||||||
|
@ -150,6 +158,19 @@ class LiveManager {
|
||||||
return currentLives.reduce((sum, obj) => sum + obj.size, 0)
|
return currentLives.reduce((sum, obj) => sum + obj.size, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addViewTo (videoId: number) {
|
||||||
|
if (this.videoSessions.has(videoId) === false) return
|
||||||
|
|
||||||
|
let watchers = this.watchersPerVideo.get(videoId)
|
||||||
|
|
||||||
|
if (!watchers) {
|
||||||
|
watchers = []
|
||||||
|
this.watchersPerVideo.set(videoId, watchers)
|
||||||
|
}
|
||||||
|
|
||||||
|
watchers.push(new Date().getTime())
|
||||||
|
}
|
||||||
|
|
||||||
private getContext () {
|
private getContext () {
|
||||||
return context
|
return context
|
||||||
}
|
}
|
||||||
|
@ -331,6 +352,7 @@ class LiveManager {
|
||||||
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
|
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
|
||||||
|
|
||||||
this.transSessions.delete(sessionId)
|
this.transSessions.delete(sessionId)
|
||||||
|
this.watchersPerVideo.delete(videoLive.videoId)
|
||||||
|
|
||||||
Promise.all([ tsWatcher.close(), masterWatcher.close() ])
|
Promise.all([ tsWatcher.close(), masterWatcher.close() ])
|
||||||
.catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
|
.catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
|
||||||
|
@ -426,6 +448,32 @@ class LiveManager {
|
||||||
return this.isAbleToUploadVideoWithCache(user.id)
|
return this.isAbleToUploadVideoWithCache(user.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async updateLiveViews () {
|
||||||
|
if (!this.isRunning()) return
|
||||||
|
|
||||||
|
logger.info('Updating live video views.')
|
||||||
|
|
||||||
|
for (const videoId of this.watchersPerVideo.keys()) {
|
||||||
|
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
|
||||||
|
|
||||||
|
const watchers = this.watchersPerVideo.get(videoId)
|
||||||
|
|
||||||
|
const numWatchers = watchers.length
|
||||||
|
|
||||||
|
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
|
||||||
|
video.views = numWatchers
|
||||||
|
await video.save()
|
||||||
|
|
||||||
|
await federateVideoIfNeeded(video, false)
|
||||||
|
|
||||||
|
// Only keep not expired watchers
|
||||||
|
const newWatchers = watchers.filter(w => w > notBefore)
|
||||||
|
this.watchersPerVideo.set(videoId, newWatchers)
|
||||||
|
|
||||||
|
logger.debug('New live video views for %s is %d.', video.url, numWatchers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static get Instance () {
|
static get Instance () {
|
||||||
return this.instance || (this.instance = new this())
|
return this.instance || (this.instance = new this())
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import {
|
||||||
USER_EMAIL_VERIFY_LIFETIME,
|
USER_EMAIL_VERIFY_LIFETIME,
|
||||||
USER_PASSWORD_RESET_LIFETIME,
|
USER_PASSWORD_RESET_LIFETIME,
|
||||||
USER_PASSWORD_CREATE_LIFETIME,
|
USER_PASSWORD_CREATE_LIFETIME,
|
||||||
VIDEO_VIEW_LIFETIME,
|
VIEW_LIFETIME,
|
||||||
WEBSERVER,
|
WEBSERVER,
|
||||||
TRACKER_RATE_LIMITS
|
TRACKER_RATE_LIMITS
|
||||||
} from '../initializers/constants'
|
} from '../initializers/constants'
|
||||||
|
@ -118,8 +118,12 @@ class Redis {
|
||||||
|
|
||||||
/* ************ Views per IP ************ */
|
/* ************ Views per IP ************ */
|
||||||
|
|
||||||
setIPVideoView (ip: string, videoUUID: string) {
|
setIPVideoView (ip: string, videoUUID: string, isLive: boolean) {
|
||||||
return this.setValue(this.generateViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME)
|
const lifetime = isLive
|
||||||
|
? VIEW_LIFETIME.LIVE
|
||||||
|
: VIEW_LIFETIME.VIDEO
|
||||||
|
|
||||||
|
return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime)
|
||||||
}
|
}
|
||||||
|
|
||||||
async doesVideoIPViewExist (ip: string, videoUUID: string) {
|
async doesVideoIPViewExist (ip: string, videoUUID: string) {
|
||||||
|
|
|
@ -28,9 +28,12 @@ import {
|
||||||
testImage,
|
testImage,
|
||||||
updateCustomSubConfig,
|
updateCustomSubConfig,
|
||||||
updateLive,
|
updateLive,
|
||||||
|
viewVideo,
|
||||||
|
wait,
|
||||||
waitJobs,
|
waitJobs,
|
||||||
waitUntilLiveStarts
|
waitUntilLiveStarts
|
||||||
} from '../../../../shared/extra-utils'
|
} from '../../../../shared/extra-utils'
|
||||||
|
import { FfmpegCommand } from 'fluent-ffmpeg'
|
||||||
|
|
||||||
const expect = chai.expect
|
const expect = chai.expect
|
||||||
|
|
||||||
|
@ -419,6 +422,80 @@ describe('Test live', function () {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe('Live views', function () {
|
||||||
|
let liveVideoId: string
|
||||||
|
let command: FfmpegCommand
|
||||||
|
|
||||||
|
async function countViews (expected: number) {
|
||||||
|
for (const server of servers) {
|
||||||
|
const res = await getVideo(server.url, liveVideoId)
|
||||||
|
const video: VideoDetails = res.body
|
||||||
|
|
||||||
|
expect(video.views).to.equal(expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
before(async function () {
|
||||||
|
this.timeout(30000)
|
||||||
|
|
||||||
|
const liveAttributes = {
|
||||||
|
name: 'live video',
|
||||||
|
channelId: servers[0].videoChannel.id,
|
||||||
|
privacy: VideoPrivacy.PUBLIC
|
||||||
|
}
|
||||||
|
|
||||||
|
const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
|
||||||
|
liveVideoId = res.body.video.uuid
|
||||||
|
|
||||||
|
command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
|
||||||
|
await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId)
|
||||||
|
await waitJobs(servers)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should display no views for a live', async function () {
|
||||||
|
await countViews(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should view a live twice and display 1 view', async function () {
|
||||||
|
this.timeout(30000)
|
||||||
|
|
||||||
|
await viewVideo(servers[0].url, liveVideoId)
|
||||||
|
await viewVideo(servers[0].url, liveVideoId)
|
||||||
|
|
||||||
|
await wait(5000)
|
||||||
|
|
||||||
|
await waitJobs(servers)
|
||||||
|
|
||||||
|
await countViews(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should wait 5 seconds and display 0 views', async function () {
|
||||||
|
this.timeout(30000)
|
||||||
|
|
||||||
|
await wait(5000)
|
||||||
|
await waitJobs(servers)
|
||||||
|
|
||||||
|
await countViews(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should view a live on a remote and on local and display 2 views', async function () {
|
||||||
|
this.timeout(30000)
|
||||||
|
|
||||||
|
await viewVideo(servers[0].url, liveVideoId)
|
||||||
|
await viewVideo(servers[1].url, liveVideoId)
|
||||||
|
await viewVideo(servers[1].url, liveVideoId)
|
||||||
|
|
||||||
|
await wait(5000)
|
||||||
|
await waitJobs(servers)
|
||||||
|
|
||||||
|
await countViews(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async function () {
|
||||||
|
await stopFfmpeg(command)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
describe('Live socket messages', function () {
|
describe('Live socket messages', function () {
|
||||||
|
|
||||||
async function createLiveWrapper () {
|
async function createLiveWrapper () {
|
||||||
|
|
Loading…
Reference in New Issue