Live views update
This commit is contained in:
parent
5cac83a78d
commit
a800dbf345
|
@ -25,6 +25,7 @@ import { VideoActionsDisplayType, VideoDownloadComponent } from '@app/shared/sha
|
||||||
import { VideoPlaylist, VideoPlaylistService } from '@app/shared/shared-video-playlist'
|
import { VideoPlaylist, VideoPlaylistService } from '@app/shared/shared-video-playlist'
|
||||||
import { MetaService } from '@ngx-meta/core'
|
import { MetaService } from '@ngx-meta/core'
|
||||||
import { peertubeLocalStorage } from '@root-helpers/peertube-web-storage'
|
import { peertubeLocalStorage } from '@root-helpers/peertube-web-storage'
|
||||||
|
import { HttpStatusCode } from '@shared/core-utils/miscs/http-error-codes'
|
||||||
import { ServerConfig, ServerErrorCode, UserVideoRateType, VideoCaption, VideoPrivacy, VideoState } from '@shared/models'
|
import { ServerConfig, ServerErrorCode, UserVideoRateType, VideoCaption, VideoPrivacy, VideoState } from '@shared/models'
|
||||||
import { getStoredP2PEnabled, getStoredTheater } from '../../../assets/player/peertube-player-local-storage'
|
import { getStoredP2PEnabled, getStoredTheater } from '../../../assets/player/peertube-player-local-storage'
|
||||||
import {
|
import {
|
||||||
|
@ -39,7 +40,6 @@ import { isWebRTCDisabled, timeToInt } from '../../../assets/player/utils'
|
||||||
import { environment } from '../../../environments/environment'
|
import { environment } from '../../../environments/environment'
|
||||||
import { VideoSupportComponent } from './modal/video-support.component'
|
import { VideoSupportComponent } from './modal/video-support.component'
|
||||||
import { VideoWatchPlaylistComponent } from './video-watch-playlist.component'
|
import { VideoWatchPlaylistComponent } from './video-watch-playlist.component'
|
||||||
import { HttpStatusCode } from '@shared/core-utils/miscs/http-error-codes'
|
|
||||||
|
|
||||||
type URLOptions = CustomizationOptions & { playerMode: PlayerMode }
|
type URLOptions = CustomizationOptions & { playerMode: PlayerMode }
|
||||||
|
|
||||||
|
@ -866,9 +866,28 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
|
||||||
|
|
||||||
private async subscribeToLiveEventsIfNeeded (oldVideo: VideoDetails, newVideo: VideoDetails) {
|
private async subscribeToLiveEventsIfNeeded (oldVideo: VideoDetails, newVideo: VideoDetails) {
|
||||||
if (!this.liveVideosSub) {
|
if (!this.liveVideosSub) {
|
||||||
this.liveVideosSub = this.peertubeSocket.getLiveVideosObservable()
|
this.liveVideosSub = this.buildLiveEventsSubscription()
|
||||||
.subscribe(({ payload }) => {
|
}
|
||||||
if (payload.state !== VideoState.PUBLISHED) return
|
|
||||||
|
if (oldVideo && oldVideo.id !== newVideo.id) {
|
||||||
|
await this.peertubeSocket.unsubscribeLiveVideos(oldVideo.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!newVideo.isLive) return
|
||||||
|
|
||||||
|
await this.peertubeSocket.subscribeToLiveVideosSocket(newVideo.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
private buildLiveEventsSubscription () {
|
||||||
|
return this.peertubeSocket.getLiveVideosObservable()
|
||||||
|
.subscribe(({ type, payload }) => {
|
||||||
|
if (type === 'state-change') return this.handleLiveStateChange(payload.state)
|
||||||
|
if (type === 'views-change') return this.handleLiveViewsChange(payload.views)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleLiveStateChange (newState: VideoState) {
|
||||||
|
if (newState !== VideoState.PUBLISHED) return
|
||||||
|
|
||||||
const videoState = this.video.state.id
|
const videoState = this.video.state.id
|
||||||
if (videoState !== VideoState.WAITING_FOR_LIVE && videoState !== VideoState.LIVE_ENDED) return
|
if (videoState !== VideoState.WAITING_FOR_LIVE && videoState !== VideoState.LIVE_ENDED) return
|
||||||
|
@ -880,16 +899,15 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
|
||||||
// Reset to refetch the video
|
// Reset to refetch the video
|
||||||
this.video = undefined
|
this.video = undefined
|
||||||
this.loadVideo(videoUUID)
|
this.loadVideo(videoUUID)
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oldVideo && oldVideo.id !== newVideo.id) {
|
private handleLiveViewsChange (newViews: number) {
|
||||||
await this.peertubeSocket.unsubscribeLiveVideos(oldVideo.id)
|
if (!this.video) {
|
||||||
|
console.error('Cannot update video live views because video is no defined.')
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!newVideo.isLive) return
|
this.video.views = newViews
|
||||||
|
|
||||||
await this.peertubeSocket.subscribeToLiveVideosSocket(newVideo.id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private initHotkeys () {
|
private initHotkeys () {
|
||||||
|
|
|
@ -73,9 +73,12 @@ export class PeerTubeSocket {
|
||||||
this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos')
|
this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos')
|
||||||
})
|
})
|
||||||
|
|
||||||
const type: LiveVideoEventType = 'state-change'
|
const types: LiveVideoEventType[] = [ 'views-change', 'state-change' ]
|
||||||
|
|
||||||
|
for (const type of types) {
|
||||||
this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => this.dispatchLiveVideoEvent(type, payload))
|
this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => this.dispatchLiveVideoEvent(type, payload))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async importIOIfNeeded () {
|
private async importIOIfNeeded () {
|
||||||
if (this.io) return
|
if (this.io) return
|
||||||
|
|
|
@ -461,8 +461,13 @@ async function updateVideoFromAP (options: {
|
||||||
transaction: undefined
|
transaction: undefined
|
||||||
})
|
})
|
||||||
|
|
||||||
if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users?
|
// Notify our users?
|
||||||
if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
|
if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated)
|
||||||
|
|
||||||
|
if (videoUpdated.isLive) {
|
||||||
|
PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
|
||||||
|
PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
|
||||||
|
}
|
||||||
|
|
||||||
logger.info('Remote video with uuid %s updated', videoObject.uuid)
|
logger.info('Remote video with uuid %s updated', videoObject.uuid)
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,7 @@ async function saveLive (video: MVideo, live: MVideoLive) {
|
||||||
await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
|
await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
|
||||||
hlsPlaylist.VideoFiles = []
|
hlsPlaylist.VideoFiles = []
|
||||||
|
|
||||||
let durationDone: boolean
|
let durationDone = false
|
||||||
|
|
||||||
for (const playlistFile of playlistFiles) {
|
for (const playlistFile of playlistFiles) {
|
||||||
const concatenatedTsFile = LiveManager.Instance.buildConcatenatedName(playlistFile)
|
const concatenatedTsFile = LiveManager.Instance.buildConcatenatedName(playlistFile)
|
||||||
|
|
|
@ -537,6 +537,8 @@ class LiveManager {
|
||||||
|
|
||||||
await federateVideoIfNeeded(video, false)
|
await federateVideoIfNeeded(video, false)
|
||||||
|
|
||||||
|
PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
|
||||||
|
|
||||||
// Only keep not expired watchers
|
// Only keep not expired watchers
|
||||||
const newWatchers = watchers.filter(w => w > notBefore)
|
const newWatchers = watchers.filter(w => w > notBefore)
|
||||||
this.watchersPerVideo.set(videoId, newWatchers)
|
this.watchersPerVideo.set(videoId, newWatchers)
|
||||||
|
|
|
@ -69,7 +69,18 @@ class PeerTubeSocket {
|
||||||
const data: LiveVideoEventPayload = { state: video.state }
|
const data: LiveVideoEventPayload = { state: video.state }
|
||||||
const type: LiveVideoEventType = 'state-change'
|
const type: LiveVideoEventType = 'state-change'
|
||||||
|
|
||||||
logger.debug('Sending video live new state notification of %s.', video.url)
|
logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state })
|
||||||
|
|
||||||
|
this.liveVideosNamespace
|
||||||
|
.in(video.id)
|
||||||
|
.emit(type, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
sendVideoViewsUpdate (video: MVideo) {
|
||||||
|
const data: LiveVideoEventPayload = { views: video.views }
|
||||||
|
const type: LiveVideoEventType = 'views-change'
|
||||||
|
|
||||||
|
logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views })
|
||||||
|
|
||||||
this.liveVideosNamespace
|
this.liveVideosNamespace
|
||||||
.in(video.id)
|
.in(video.id)
|
||||||
|
|
|
@ -328,7 +328,7 @@ describe('Test live', function () {
|
||||||
await checkResolutionsInMasterPlaylist(hlsPlaylist.playlistUrl, resolutions)
|
await checkResolutionsInMasterPlaylist(hlsPlaylist.playlistUrl, resolutions)
|
||||||
|
|
||||||
for (let i = 0; i < resolutions.length; i++) {
|
for (let i = 0; i < resolutions.length; i++) {
|
||||||
const segmentNum = 2
|
const segmentNum = 3
|
||||||
const segmentName = `${i}-00000${segmentNum}.ts`
|
const segmentName = `${i}-00000${segmentNum}.ts`
|
||||||
await waitUntilLiveSegmentGeneration(servers[0], video.uuid, i, segmentNum)
|
await waitUntilLiveSegmentGeneration(servers[0], video.uuid, i, segmentNum)
|
||||||
|
|
||||||
|
@ -608,6 +608,55 @@ describe('Test live', function () {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('Should correctly send views change notification', async function () {
|
||||||
|
this.timeout(60000)
|
||||||
|
|
||||||
|
let localLastVideoViews = 0
|
||||||
|
let remoteLastVideoViews = 0
|
||||||
|
|
||||||
|
const liveVideoUUID = await createLiveWrapper()
|
||||||
|
await waitJobs(servers)
|
||||||
|
|
||||||
|
{
|
||||||
|
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
|
||||||
|
|
||||||
|
const localSocket = getLiveNotificationSocket(servers[0].url)
|
||||||
|
localSocket.on('views-change', data => { localLastVideoViews = data.views })
|
||||||
|
localSocket.emit('subscribe', { videoId })
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID)
|
||||||
|
|
||||||
|
const remoteSocket = getLiveNotificationSocket(servers[1].url)
|
||||||
|
remoteSocket.on('views-change', data => { remoteLastVideoViews = data.views })
|
||||||
|
remoteSocket.emit('subscribe', { videoId })
|
||||||
|
}
|
||||||
|
|
||||||
|
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
|
||||||
|
|
||||||
|
for (const server of servers) {
|
||||||
|
await waitUntilLivePublished(server.url, server.accessToken, liveVideoUUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
await waitJobs(servers)
|
||||||
|
|
||||||
|
expect(localLastVideoViews).to.equal(0)
|
||||||
|
expect(remoteLastVideoViews).to.equal(0)
|
||||||
|
|
||||||
|
await viewVideo(servers[0].url, liveVideoUUID)
|
||||||
|
await viewVideo(servers[1].url, liveVideoUUID)
|
||||||
|
|
||||||
|
await waitJobs(servers)
|
||||||
|
await wait(5000)
|
||||||
|
await waitJobs(servers)
|
||||||
|
|
||||||
|
expect(localLastVideoViews).to.equal(2)
|
||||||
|
expect(remoteLastVideoViews).to.equal(2)
|
||||||
|
|
||||||
|
await stopFfmpeg(command)
|
||||||
|
})
|
||||||
|
|
||||||
it('Should not receive a notification after unsubscribe', async function () {
|
it('Should not receive a notification after unsubscribe', async function () {
|
||||||
this.timeout(60000)
|
this.timeout(60000)
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import { VideoState } from '../video-state.enum'
|
import { VideoState } from '../video-state.enum'
|
||||||
|
|
||||||
export interface LiveVideoEventPayload {
|
export interface LiveVideoEventPayload {
|
||||||
state: VideoState
|
state?: VideoState
|
||||||
|
views?: number
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
export type LiveVideoEventType = 'state-change'
|
export type LiveVideoEventType = 'state-change' | 'views-change'
|
||||||
|
|
Loading…
Reference in New Issue