From afb371d940531a5a8aafb5895b369b9cdecd555b Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 27 Jan 2023 09:04:02 +0100 Subject: [PATCH] Refactor playlist creation for lives --- server/lib/live/live-manager.ts | 34 ++---------- server/lib/live/shared/muxing-session.ts | 71 ++++++++++++++++-------- server/tests/api/server/proxy.ts | 2 +- 3 files changed, 54 insertions(+), 53 deletions(-) diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index fa4e1df07..1d5b8bf14 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -13,18 +13,18 @@ import { } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' +import { VIDEO_LIVE } from '@server/initializers/constants' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' +import { MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' import { pick, wait } from '@shared/core-utils' -import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models' +import { LiveVideoError, VideoState } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' +import { getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { Hooks } from '../plugins/hooks' import { LiveQuotaStore } from './live-quota-store' @@ -255,13 +255,10 @@ class LiveManager { { allResolutions, ...lTags(sessionId, video.uuid) } ) - const streamingPlaylist = await this.createLivePlaylist(video, allResolutions) - return this.runMuxingSession({ sessionId, videoLive, - streamingPlaylist, inputUrl, fps, bitrate, @@ -275,7 +272,6 @@ class LiveManager { sessionId: string videoLive: MVideoLiveVideo - streamingPlaylist: MStreamingPlaylistVideo inputUrl: string fps: number bitrate: number @@ -298,7 +294,7 @@ class LiveManager { videoLive, user, - ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) + ...pick(options, [ 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) }) muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) @@ -474,26 +470,6 @@ class LiveManager { return resolutionsEnabled } - private async createLivePlaylist (video: MVideo, allResolutions: number[]): Promise { - const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(video) - - playlist.playlistFilename = generateHLSMasterPlaylistFilename(true) - playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true) - - playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION - playlist.type = VideoStreamingPlaylistType.HLS - - playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED - ? VideoStorage.OBJECT_STORAGE - : VideoStorage.FILE_SYSTEM - - if (playlist.storage === VideoStorage.FILE_SYSTEM) { - playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) - } - - return playlist.save() - } - private saveStartingSession (videoLive: MVideoLiveVideo) { const liveSession = new VideoLiveSessionModel({ startDate: new Date(), diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 25ecf1c64..2727fc4a7 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -1,4 +1,3 @@ - import { mapSeries } from 'bluebird' import { FSWatcher, watch } from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' @@ -9,12 +8,18 @@ import { EventEmitter } from 'stream' import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' -import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' +import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' import { VideoFileModel } from '@server/models/video/video-file' +import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' -import { VideoStorage } from '@shared/models' -import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' +import { VideoStorage, VideoStreamingPlaylistType } from '@shared/models' +import { + generateHLSMasterPlaylistFilename, + generateHlsSha256SegmentsFilename, + getLiveDirectory, + getLiveReplayBaseDirectory +} from '../../paths' import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' import { LiveQuotaStore } from '../live-quota-store' @@ -53,7 +58,6 @@ class MuxingSession extends EventEmitter { private readonly user: MUserId private readonly sessionId: string private readonly videoLive: MVideoLiveVideo - private readonly streamingPlaylist: MStreamingPlaylistVideo private readonly inputUrl: string private readonly fps: number private readonly allResolutions: number[] @@ -70,12 +74,13 @@ class MuxingSession extends EventEmitter { private readonly outDirectory: string private readonly replayDirectory: string - private readonly liveSegmentShaStore: LiveSegmentShaStore - private readonly lTags: LoggerTagsFn private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} + private streamingPlaylist: MStreamingPlaylistVideo + private liveSegmentShaStore: LiveSegmentShaStore + private tsWatcher: FSWatcher private masterWatcher: FSWatcher private m3u8Watcher: FSWatcher @@ -98,7 +103,6 @@ class MuxingSession extends EventEmitter { user: MUserId sessionId: string videoLive: MVideoLiveVideo - streamingPlaylist: MStreamingPlaylistVideo inputUrl: string fps: number bitrate: number @@ -112,7 +116,6 @@ class MuxingSession extends EventEmitter { this.user = options.user this.sessionId = options.sessionId this.videoLive = options.videoLive - this.streamingPlaylist = options.streamingPlaylist this.inputUrl = options.inputUrl this.fps = options.fps @@ -131,17 +134,13 @@ class MuxingSession extends EventEmitter { this.outDirectory = getLiveDirectory(this.videoLive.Video) this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) - this.liveSegmentShaStore = new LiveSegmentShaStore({ - videoUUID: this.videoLive.Video.uuid, - sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), - streamingPlaylist: this.streamingPlaylist, - sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED - }) - this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) } async runMuxing () { + this.streamingPlaylist = await this.createLivePlaylist() + + this.createLiveShaStore() this.createFiles() await this.prepareDirectories() @@ -257,17 +256,18 @@ class MuxingSession extends EventEmitter { this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) this.masterWatcher.on('add', async () => { - if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { - try { + try { + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) this.streamingPlaylist.playlistUrl = url - this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions) - - await this.streamingPlaylist.save() - } catch (err) { - logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() }) } + + this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions) + + await this.streamingPlaylist.save() + } catch (err) { + logger.error('Cannot update streaming playlist.', { err, ...this.lTags() }) } this.masterPlaylistCreated = true @@ -478,6 +478,31 @@ class MuxingSession extends EventEmitter { logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() }) } } + + private async createLivePlaylist (): Promise { + const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(this.videoLive.Video) + + playlist.playlistFilename = generateHLSMasterPlaylistFilename(true) + playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true) + + playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION + playlist.type = VideoStreamingPlaylistType.HLS + + playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED + ? VideoStorage.OBJECT_STORAGE + : VideoStorage.FILE_SYSTEM + + return playlist.save() + } + + private createLiveShaStore () { + this.liveSegmentShaStore = new LiveSegmentShaStore({ + videoUUID: this.videoLive.Video.uuid, + sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), + streamingPlaylist: this.streamingPlaylist, + sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED + }) + } } // --------------------------------------------------------------------------- diff --git a/server/tests/api/server/proxy.ts b/server/tests/api/server/proxy.ts index de8872d74..4bf89410e 100644 --- a/server/tests/api/server/proxy.ts +++ b/server/tests/api/server/proxy.ts @@ -95,7 +95,7 @@ describe('Test proxy', function () { } it('Should succeed import with the appropriate proxy config', async function () { - this.timeout(120000) + this.timeout(240000) await servers[0].kill() await servers[0].run({}, { env: goodEnv })