From 25684e837c2022c7adee77998f4adcb6cf0b7a03 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 9 Aug 2024 09:42:58 +0200 Subject: [PATCH] Fix client player error on fast restream --- .../+video-watch/video-watch.component.ts | 19 ++++--- .../notification/peertube-socket.service.ts | 2 +- .../shared/p2p-media-loader/hls-plugin.ts | 17 ++++++- .../p2p-media-loader-plugin.ts | 9 +--- .../redundancy-url-manager.ts | 7 ++- .../p2p-media-loader/segment-validator.ts | 2 +- client/src/standalone/videos/embed.ts | 19 ++++--- .../standalone/videos/shared/live-manager.ts | 32 +++++++++--- .../src/videos/live/live-video-event.type.ts | 2 +- .../src/api/live/live-socket-messages.ts | 50 +++++++++++++++++-- server/core/lib/live/live-manager.ts | 2 + server/core/lib/peertube-socket.ts | 21 +++++++- 12 files changed, 141 insertions(+), 41 deletions(-) diff --git a/client/src/app/+videos/+video-watch/video-watch.component.ts b/client/src/app/+videos/+video-watch/video-watch.component.ts index 3121a4723..2bc23a2be 100644 --- a/client/src/app/+videos/+video-watch/video-watch.component.ts +++ b/client/src/app/+videos/+video-watch/video-watch.component.ts @@ -562,14 +562,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { if (this.video.isLive) { player.one('ended', () => { - this.zone.run(() => { - // We changed the video, it's not a live anymore - if (!this.video.isLive) return - - this.video.state.id = VideoState.LIVE_ENDED - - this.updatePlayerOnNoLive() - }) + this.zone.run(() => this.endLive()) }) } @@ -884,6 +877,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { .subscribe(({ type, payload }) => { if (type === 'state-change') return this.handleLiveStateChange(payload.state) if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers) + if (type === 'force-end') return this.endLive() }) } @@ -992,4 +986,13 @@ export class VideoWatchComponent implements OnInit, OnDestroy { peertubeLink: false } } + + private endLive () { + // We changed the video, it's not a live anymore + if (!this.video.isLive) return + + this.video.state.id = VideoState.LIVE_ENDED + + this.updatePlayerOnNoLive() + } } diff --git a/client/src/app/core/notification/peertube-socket.service.ts b/client/src/app/core/notification/peertube-socket.service.ts index 15af9a310..e0e470af0 100644 --- a/client/src/app/core/notification/peertube-socket.service.ts +++ b/client/src/app/core/notification/peertube-socket.service.ts @@ -68,7 +68,7 @@ export class PeerTubeSocket { this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos') - const types: LiveVideoEventType[] = [ 'views-change', 'state-change' ] + const types: LiveVideoEventType[] = [ 'views-change', 'state-change', 'force-end' ] for (const type of types) { this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => { diff --git a/client/src/assets/player/shared/p2p-media-loader/hls-plugin.ts b/client/src/assets/player/shared/p2p-media-loader/hls-plugin.ts index e158d650c..3096d41ae 100644 --- a/client/src/assets/player/shared/p2p-media-loader/hls-plugin.ts +++ b/client/src/assets/player/shared/p2p-media-loader/hls-plugin.ts @@ -130,6 +130,8 @@ export class Html5Hlsjs { private dvrDuration: number = null private edgeMargin: number = null + private liveEnded = false + private handlers: { [ id in 'play' | 'error' ]: EventListener } = { play: null, error: null @@ -260,6 +262,16 @@ export class Html5Hlsjs { private _handleNetworkError (error: any) { if (navigator.onLine === false) return + // We may have errors if the live ended because of a fast-restream in the same permanent live + if (this.liveEnded) { + logger.info('Forcing end of live stream after a network error'); + + (this.player as any)?.handleTechEnded_() + this.hls?.stopLoad() + + return + } + if (this.errorCounts[Hlsjs.ErrorTypes.NETWORK_ERROR] <= this.maxNetworkErrorRecovery) { logger.info('trying to recover network error') @@ -383,6 +395,8 @@ export class Html5Hlsjs { } private initialize () { + this.liveEnded = false + this.buildBaseConfig() if ([ '', 'auto' ].includes(this.videoElement.preload) && !this.videoElement.autoplay && this.hlsjsConfig.autoStartLoad === undefined) { @@ -403,7 +417,7 @@ export class Html5Hlsjs { this.hls.on(Hlsjs.Events.ERROR, (event, data) => this._onError(event, data)) this.hls.on(Hlsjs.Events.MANIFEST_PARSED, (event, data) => this._onMetaData(event, data)) - this.hls.on(Hlsjs.Events.LEVEL_LOADED, (event, data) => { + this.hls.on(Hlsjs.Events.LEVEL_LOADED, (_event, data) => { // The DVR plugin will auto seek to "live edge" on start up if (this.hlsjsConfig.liveSyncDuration) { this.edgeMargin = this.hlsjsConfig.liveSyncDuration @@ -412,6 +426,7 @@ export class Html5Hlsjs { } if (this.isLive && !data.details.live) { + this.liveEnded = true this.player.trigger('hlsjs-live-ended') } diff --git a/client/src/assets/player/shared/p2p-media-loader/p2p-media-loader-plugin.ts b/client/src/assets/player/shared/p2p-media-loader/p2p-media-loader-plugin.ts index e217d06da..54f43d9ae 100644 --- a/client/src/assets/player/shared/p2p-media-loader/p2p-media-loader-plugin.ts +++ b/client/src/assets/player/shared/p2p-media-loader/p2p-media-loader-plugin.ts @@ -141,14 +141,9 @@ class P2pMediaLoaderPlugin extends Plugin { initHlsJsPlayer(this.player, this.hlsjs) this.p2pEngine.on(Events.SegmentError, (segment: Segment, err) => { - if (navigator.onLine === false) return - // We may have errors if the live ended because of a fast-restream in the same permanent live - if (this.liveEnded) { - (this.player as any).handleTechEnded_() - return - } + if (navigator.onLine === false || this.liveEnded) return - logger.error(`Segment ${segment.id} error.`, err) + logger.clientError(`Segment ${segment.id} error.`, err) if (this.options.redundancyUrlManager) { this.options.redundancyUrlManager.removeBySegmentUrl(segment.requestUrl) diff --git a/client/src/assets/player/shared/p2p-media-loader/redundancy-url-manager.ts b/client/src/assets/player/shared/p2p-media-loader/redundancy-url-manager.ts index 778cbeba1..ab051ca44 100644 --- a/client/src/assets/player/shared/p2p-media-loader/redundancy-url-manager.ts +++ b/client/src/assets/player/shared/p2p-media-loader/redundancy-url-manager.ts @@ -7,11 +7,14 @@ class RedundancyUrlManager { } removeBySegmentUrl (segmentUrl: string) { - logger.info(`Removing redundancy of segment URL ${segmentUrl}.`) - const baseUrl = getBaseUrl(segmentUrl) + const oldLength = baseUrl.length this.baseUrls = this.baseUrls.filter(u => u !== baseUrl && u !== baseUrl + '/') + + if (oldLength !== this.baseUrls.length) { + logger.info(`Removed redundancy of segment URL ${segmentUrl}.`) + } } buildUrl (url: string) { diff --git a/client/src/assets/player/shared/p2p-media-loader/segment-validator.ts b/client/src/assets/player/shared/p2p-media-loader/segment-validator.ts index 0a3dbef72..e5f663117 100644 --- a/client/src/assets/player/shared/p2p-media-loader/segment-validator.ts +++ b/client/src/assets/player/shared/p2p-media-loader/segment-validator.ts @@ -70,7 +70,7 @@ export class SegmentValidator { throw new Error(`Unknown segment name ${filename}/${range} in segment validator`) } - debugLogger(`Validating ${filename} range ${segment.range}`) + debugLogger(`Validating ${filename}` + (segment.range ? ` range ${segment.range}` : '')) const calculatedSha = await this.sha256Hex(segment.data) if (calculatedSha !== hashShouldBe) { diff --git a/client/src/standalone/videos/embed.ts b/client/src/standalone/videos/embed.ts index 4c39ac2c7..0e926191a 100644 --- a/client/src/standalone/videos/embed.ts +++ b/client/src/standalone/videos/embed.ts @@ -317,17 +317,20 @@ export class PeerTubeEmbed { if (video.isLive) { this.liveManager.listenForChanges({ video, + onPublishedVideo: () => { this.liveManager.stopListeningForChanges(video) this.loadVideoAndBuildPlayer({ uuid: video.uuid, forceAutoplay: true }) - } + }, + + onForceEnd: () => this.endLive(video, translations) }) if (video.state.id === VideoState.WAITING_FOR_LIVE || video.state.id === VideoState.LIVE_ENDED) { this.liveManager.displayInfo({ state: video.state.id, translations }) this.peertubePlayer.disable() } else { - this.correctlyHandleLiveEnding(translations) + this.player.one('ended', () => this.endLive(video, translations)) } } @@ -369,13 +372,13 @@ export class PeerTubeEmbed { // --------------------------------------------------------------------------- - private correctlyHandleLiveEnding (translations: Translations) { - this.player.one('ended', () => { - // Display the live ended information - this.liveManager.displayInfo({ state: VideoState.LIVE_ENDED, translations }) + private endLive (video: VideoDetails, translations: Translations) { + // Display the live ended information + this.liveManager.displayInfo({ state: VideoState.LIVE_ENDED, translations }) - this.peertubePlayer.disable() - }) + this.peertubePlayer.unload() + this.peertubePlayer.disable() + this.peertubePlayer.setPoster(video.previewPath) } private async handlePasswordError (err: PeerTubeServerError) { diff --git a/client/src/standalone/videos/shared/live-manager.ts b/client/src/standalone/videos/shared/live-manager.ts index a1fadd6d4..a3db30e94 100644 --- a/client/src/standalone/videos/shared/live-manager.ts +++ b/client/src/standalone/videos/shared/live-manager.ts @@ -7,7 +7,8 @@ import { getBackendUrl } from './url' export class LiveManager { private liveSocket: Socket - private listeners = new Map void>() + private stateChangeListeners = new Map void>() + private forceEndListeners = new Map void>() constructor ( private readonly playerHTML: PlayerHTML @@ -17,16 +18,19 @@ export class LiveManager { async listenForChanges (options: { video: VideoDetails + onPublishedVideo: () => any + + onForceEnd: () => any }) { - const { video, onPublishedVideo } = options + const { video, onPublishedVideo, onForceEnd } = options if (!this.liveSocket) { const io = (await import('socket.io-client')).io this.liveSocket = io(getBackendUrl() + '/live-videos') } - const listener = (payload: LiveVideoEventPayload) => { + const stateChangeListener = (payload: LiveVideoEventPayload) => { if (payload.state === VideoState.PUBLISHED) { this.playerHTML.removeInformation() onPublishedVideo() @@ -34,16 +38,28 @@ export class LiveManager { } } - this.liveSocket.on('state-change', listener) - this.listeners.set(video.uuid, listener) + const forceEndListener = () => { + onForceEnd() + } + + this.liveSocket.on('state-change', stateChangeListener) + this.liveSocket.on('force-end', forceEndListener) + + this.stateChangeListeners.set(video.uuid, stateChangeListener) + this.forceEndListeners.set(video.uuid, forceEndListener) this.liveSocket.emit('subscribe', { videoId: video.id }) } stopListeningForChanges (video: VideoDetails) { - const listener = this.listeners.get(video.uuid) - if (listener) { - this.liveSocket.off('state-change', listener) + { + const listener = this.stateChangeListeners.get(video.uuid) + if (listener) this.liveSocket.off('state-change', listener) + } + + { + const listener = this.forceEndListeners.get(video.uuid) + if (listener) this.liveSocket.off('force-end', listener) } this.liveSocket.emit('unsubscribe', { videoId: video.id }) diff --git a/packages/models/src/videos/live/live-video-event.type.ts b/packages/models/src/videos/live/live-video-event.type.ts index 50f794561..4e9634537 100644 --- a/packages/models/src/videos/live/live-video-event.type.ts +++ b/packages/models/src/videos/live/live-video-event.type.ts @@ -1 +1 @@ -export type LiveVideoEventType = 'state-change' | 'views-change' +export type LiveVideoEventType = 'state-change' | 'views-change' | 'force-end' diff --git a/packages/tests/src/api/live/live-socket-messages.ts b/packages/tests/src/api/live/live-socket-messages.ts index d055ba605..854ba7ccf 100644 --- a/packages/tests/src/api/live/live-socket-messages.ts +++ b/packages/tests/src/api/live/live-socket-messages.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ import { wait } from '@peertube/peertube-core-utils' -import { LiveVideoEventPayload, VideoPrivacy, VideoState, VideoStateType } from '@peertube/peertube-models' +import { LiveVideoCreate, LiveVideoEventPayload, VideoPrivacy, VideoState, VideoStateType } from '@peertube/peertube-models' import { PeerTubeServer, cleanupTests, @@ -36,11 +36,13 @@ describe('Test live socket messages', function () { describe('Live socket messages', function () { - async function createLiveWrapper () { + async function createLiveWrapper (options: Partial = {}) { const liveAttributes = { name: 'live video', channelId: servers[0].store.channel.id, - privacy: VideoPrivacy.PUBLIC + privacy: VideoPrivacy.PUBLIC, + + ...options } const { uuid } = await servers[0].live.create({ fields: liveAttributes }) @@ -173,6 +175,48 @@ describe('Test live socket messages', function () { expect(stateChanges).to.have.lengthOf(1) }) + + it('Should correctly send a force end notification', async function () { + this.timeout(60000) + + let hadForcedEndEvent = false + + await servers[0].kill() + + const env = { PEERTUBE_TEST_CONSTANTS_VIDEO_LIVE_CLEANUP_DELAY: '20000' } + await servers[0].run({}, { env }) + + const liveVideoUUID = await createLiveWrapper({ permanentLive: true }) + + { + const videoId = await servers[0].videos.getId({ uuid: liveVideoUUID }) + + const localSocket = servers[0].socketIO.getLiveNotificationSocket() + localSocket.on('force-end', () => { hadForcedEndEvent = true }) + localSocket.emit('subscribe', { videoId }) + } + + // Streaming session #1 + const rtmpOptions = { + videoId: liveVideoUUID, + copyCodecs: true, + fixtureName: 'video_short.mp4' + } + + let ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo(rtmpOptions) + await servers[0].live.waitUntilPublished({ videoId: liveVideoUUID }) + + await stopFfmpeg(ffmpegCommand) + await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID }) + + // Streaming session #2 + ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo(rtmpOptions) + + // eslint-disable-next-line no-unmodified-loop-condition + while (!hadForcedEndEvent) { + await wait(500) + } + }) }) after(async function () { diff --git a/server/core/lib/live/live-manager.ts b/server/core/lib/live/live-manager.ts index e0cb87fad..604772894 100644 --- a/server/core/lib/live/live-manager.ts +++ b/server/core/lib/live/live-manager.ts @@ -279,6 +279,8 @@ class LiveManager { if (oldStreamingPlaylist) { if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) + PeerTubeSocket.Instance.sendVideoForceEnd(video) + await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) } diff --git a/server/core/lib/peertube-socket.ts b/server/core/lib/peertube-socket.ts index 17168f4ad..c6ae11fc8 100644 --- a/server/core/lib/peertube-socket.ts +++ b/server/core/lib/peertube-socket.ts @@ -8,6 +8,7 @@ import { UserNotificationModelForApi } from '@server/types/models/user/index.js' import { LiveVideoEventPayload, LiveVideoEventType } from '@peertube/peertube-models' import { logger } from '../helpers/logger.js' import { authenticateRunnerSocket, authenticateSocket } from '../middlewares/index.js' +import { isDevInstance } from '@peertube/peertube-node-utils' class PeerTubeSocket { @@ -20,7 +21,11 @@ class PeerTubeSocket { private constructor () {} init (server: HTTPServer) { - const io = new SocketServer(server) + const io = new SocketServer(server, { + cors: isDevInstance() + ? { origin: 'http://localhost:5173', methods: [ 'GET', 'POST' ] } + : undefined + }) io.of('/user-notifications') .use(authenticateSocket) @@ -88,6 +93,8 @@ class PeerTubeSocket { } } + // --------------------------------------------------------------------------- + sendVideoLiveNewState (video: MVideo) { const data: LiveVideoEventPayload = { state: video.state } const type: LiveVideoEventType = 'state-change' @@ -110,6 +117,18 @@ class PeerTubeSocket { .emit(type, data) } + sendVideoForceEnd (video: MVideo) { + const type: LiveVideoEventType = 'force-end' + + logger.debug('Sending video live "force end" notification of %s.', video.url) + + this.liveVideosNamespace + .in(video.id + '') + .emit(type) + } + + // --------------------------------------------------------------------------- + @Debounce({ timeoutMS: 1000 }) sendAvailableJobsPingToRunners () { logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)