Priorize audio transcoding on if audio/video split
This commit is contained in:
parent
906fbdc08c
commit
43b70c0d65
|
@ -22,7 +22,7 @@
|
||||||
@else {
|
@else {
|
||||||
<my-reactive-file
|
<my-reactive-file
|
||||||
class="d-inline-block"
|
class="d-inline-block"
|
||||||
inputName="uploadNewThumbnail" inputLabel="Select from your device" [extensions]="videoImageExtensions"
|
inputName="uploadNewThumbnail" i18n-inputLabel inputLabel="Select from your device" [extensions]="videoImageExtensions"
|
||||||
[maxFileSize]="maxVideoImageSize" placement="right" (fileChanged)="onFileChanged($event)"
|
[maxFileSize]="maxVideoImageSize" placement="right" (fileChanged)="onFileChanged($event)"
|
||||||
[buttonTooltip]="getReactiveFileButtonTooltip()" theme="primary">
|
[buttonTooltip]="getReactiveFileButtonTooltip()" theme="primary">
|
||||||
</my-reactive-file>
|
</my-reactive-file>
|
||||||
|
|
|
@ -43,7 +43,7 @@ export function shuffle <T> (elements: T[]) {
|
||||||
return shuffled
|
return shuffled
|
||||||
}
|
}
|
||||||
|
|
||||||
export function sortBy (obj: any[], key1: string, key2?: string) {
|
export function sortBy <T> (obj: T[], key1: string, key2?: string): T[] {
|
||||||
return obj.sort((a, b) => {
|
return obj.sort((a, b) => {
|
||||||
const elem1 = key2 ? a[key1][key2] : a[key1]
|
const elem1 = key2 ? a[key1][key2] : a[key1]
|
||||||
const elem2 = key2 ? b[key1][key2] : b[key1]
|
const elem2 = key2 ? b[key1][key2] : b[key1]
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
|
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
|
||||||
|
|
||||||
import { join } from 'path'
|
import { sortBy } from '@peertube/peertube-core-utils'
|
||||||
import { HttpStatusCode } from '@peertube/peertube-models'
|
import { HLSTranscodingPayload, HttpStatusCode } from '@peertube/peertube-models'
|
||||||
import { areMockObjectStorageTestsDisabled } from '@peertube/peertube-node-utils'
|
import { areMockObjectStorageTestsDisabled } from '@peertube/peertube-node-utils'
|
||||||
import {
|
import {
|
||||||
cleanupTests,
|
cleanupTests,
|
||||||
|
@ -15,6 +15,8 @@ import {
|
||||||
import { DEFAULT_AUDIO_RESOLUTION } from '@peertube/peertube-server/core/initializers/constants.js'
|
import { DEFAULT_AUDIO_RESOLUTION } from '@peertube/peertube-server/core/initializers/constants.js'
|
||||||
import { checkDirectoryIsEmpty, checkTmpIsEmpty } from '@tests/shared/directories.js'
|
import { checkDirectoryIsEmpty, checkTmpIsEmpty } from '@tests/shared/directories.js'
|
||||||
import { completeCheckHlsPlaylist } from '@tests/shared/streaming-playlists.js'
|
import { completeCheckHlsPlaylist } from '@tests/shared/streaming-playlists.js'
|
||||||
|
import { expect } from 'chai'
|
||||||
|
import { join } from 'path'
|
||||||
|
|
||||||
describe('Test HLS with audio and video splitted', function () {
|
describe('Test HLS with audio and video splitted', function () {
|
||||||
let servers: PeerTubeServer[] = []
|
let servers: PeerTubeServer[] = []
|
||||||
|
@ -50,6 +52,30 @@ describe('Test HLS with audio and video splitted', function () {
|
||||||
await completeCheckHlsPlaylist({ servers, videoUUID: uuid, hlsOnly, splittedAudio: true, objectStorageBaseUrl })
|
await completeCheckHlsPlaylist({ servers, videoUUID: uuid, hlsOnly, splittedAudio: true, objectStorageBaseUrl })
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if (concurrency === 1) {
|
||||||
|
it('Should have processed audio just after first video resolution', async function () {
|
||||||
|
const { data } = await servers[0].jobs.list({ jobType: 'video-transcoding' })
|
||||||
|
|
||||||
|
const hlsJobs = data.filter(job => {
|
||||||
|
const data = job.data as HLSTranscodingPayload
|
||||||
|
|
||||||
|
return data.videoUUID === videoUUIDs[0] && data.type === 'new-resolution-to-hls'
|
||||||
|
})
|
||||||
|
|
||||||
|
const dataForVideo = sortBy(hlsJobs, 'processedOn')
|
||||||
|
|
||||||
|
expect((dataForVideo[0].data as HLSTranscodingPayload).resolution).to.equal(720)
|
||||||
|
|
||||||
|
// FIXME: next job after parent succeeded should be audio but bullmq seems to fire another one when it updates the child state
|
||||||
|
// It's the reason why another video stream transcoding is executed before the audio
|
||||||
|
try {
|
||||||
|
expect((dataForVideo[1].data as HLSTranscodingPayload).resolution).to.equal(0)
|
||||||
|
} catch (err) {
|
||||||
|
expect((dataForVideo[2].data as HLSTranscodingPayload).resolution).to.equal(0)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
it('Should upload an audio file and transcode it to HLS', async function () {
|
it('Should upload an audio file and transcode it to HLS', async function () {
|
||||||
this.timeout(120000)
|
this.timeout(120000)
|
||||||
|
|
||||||
|
|
|
@ -23,8 +23,8 @@ export abstract class AbstractJobBuilder <P> {
|
||||||
}) {
|
}) {
|
||||||
const { video, videoFile, isNewVideo, user, videoFileAlreadyLocked } = options
|
const { video, videoFile, isNewVideo, user, videoFileAlreadyLocked } = options
|
||||||
|
|
||||||
let mergeOrOptimizePayload: P
|
let mergeOrOptimizePayload: P & { higherPriority?: boolean }
|
||||||
let children: P[][] = []
|
let children: (P & { higherPriority?: boolean })[][] = []
|
||||||
|
|
||||||
const mutexReleaser = videoFileAlreadyLocked
|
const mutexReleaser = videoFileAlreadyLocked
|
||||||
? () => {}
|
? () => {}
|
||||||
|
@ -72,16 +72,16 @@ export abstract class AbstractJobBuilder <P> {
|
||||||
|
|
||||||
// HLS version of max resolution
|
// HLS version of max resolution
|
||||||
if (CONFIG.TRANSCODING.HLS.ENABLED === true) {
|
if (CONFIG.TRANSCODING.HLS.ENABLED === true) {
|
||||||
const hasSplitAndioTranscoding = CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO && videoFile.hasAudio()
|
const hasSplitAudioTranscoding = CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO && videoFile.hasAudio()
|
||||||
|
|
||||||
// We had some issues with a web video quick transcoded while producing a HLS version of it
|
// We had some issues with a web video quick transcoded while producing a HLS version of it
|
||||||
const copyCodecs = !quickTranscode
|
const copyCodecs = !quickTranscode
|
||||||
|
|
||||||
const hlsPayloads: P[] = []
|
const hlsPayloads: (P & { higherPriority?: boolean })[] = []
|
||||||
|
|
||||||
hlsPayloads.push(
|
hlsPayloads.push(
|
||||||
this.buildHLSJobPayload({
|
this.buildHLSJobPayload({
|
||||||
deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED && !hasSplitAndioTranscoding,
|
deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED && !hasSplitAudioTranscoding,
|
||||||
|
|
||||||
separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
|
separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
|
||||||
|
|
||||||
|
@ -94,20 +94,24 @@ export abstract class AbstractJobBuilder <P> {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
if (hasSplitAndioTranscoding) {
|
if (hasSplitAudioTranscoding) {
|
||||||
hlsAudioAlreadyGenerated = true
|
hlsAudioAlreadyGenerated = true
|
||||||
|
|
||||||
hlsPayloads.push(
|
hlsPayloads.push(
|
||||||
this.buildHLSJobPayload({
|
{
|
||||||
deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED,
|
higherPriority: true,
|
||||||
separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
|
|
||||||
|
|
||||||
copyCodecs,
|
...this.buildHLSJobPayload({
|
||||||
resolution: 0,
|
deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED,
|
||||||
fps: 0,
|
separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
|
||||||
video,
|
|
||||||
isNewVideo
|
copyCodecs,
|
||||||
})
|
resolution: 0,
|
||||||
|
fps: 0,
|
||||||
|
video,
|
||||||
|
isNewVideo
|
||||||
|
})
|
||||||
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,7 +266,8 @@ export abstract class AbstractJobBuilder <P> {
|
||||||
|
|
||||||
protected abstract createJobs (options: {
|
protected abstract createJobs (options: {
|
||||||
video: MVideoFullLight
|
video: MVideoFullLight
|
||||||
payloads: [ [ P ], ...(P[][]) ] // Array of sequential jobs to create that depend on parent job
|
// Array of sequential jobs to create that depend on parent job
|
||||||
|
payloads: [ [ (P & { higherPriority?: boolean }) ], ...((P & { higherPriority?: boolean })[][]) ]
|
||||||
user: MUserId | null
|
user: MUserId | null
|
||||||
}): Promise<void>
|
}): Promise<void>
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ import {
|
||||||
import { CreateJobArgument, JobQueue } from '@server/lib/job-queue/index.js'
|
import { CreateJobArgument, JobQueue } from '@server/lib/job-queue/index.js'
|
||||||
import { VideoJobInfoModel } from '@server/models/video/video-job-info.js'
|
import { VideoJobInfoModel } from '@server/models/video/video-job-info.js'
|
||||||
import { MUserId, MVideo } from '@server/types/models/index.js'
|
import { MUserId, MVideo } from '@server/types/models/index.js'
|
||||||
import Bluebird from 'bluebird'
|
|
||||||
import { getTranscodingJobPriority } from '../../transcoding-priority.js'
|
import { getTranscodingJobPriority } from '../../transcoding-priority.js'
|
||||||
import { AbstractJobBuilder } from './abstract-job-builder.js'
|
import { AbstractJobBuilder } from './abstract-job-builder.js'
|
||||||
|
|
||||||
|
@ -18,21 +17,29 @@ type Payload =
|
||||||
NewWebVideoResolutionTranscodingPayload |
|
NewWebVideoResolutionTranscodingPayload |
|
||||||
HLSTranscodingPayload
|
HLSTranscodingPayload
|
||||||
|
|
||||||
|
type PayloadWithPriority = Payload & { higherPriority?: boolean }
|
||||||
|
|
||||||
export class TranscodingJobQueueBuilder extends AbstractJobBuilder <Payload> {
|
export class TranscodingJobQueueBuilder extends AbstractJobBuilder <Payload> {
|
||||||
|
|
||||||
protected async createJobs (options: {
|
protected async createJobs (options: {
|
||||||
video: MVideo
|
video: MVideo
|
||||||
payloads: [ [ Payload ], ...(Payload[][]) ] // Array of sequential jobs to create that depend on parent job
|
// Array of sequential jobs to create that depend on parent job
|
||||||
|
payloads: [ [ PayloadWithPriority ], ...(PayloadWithPriority[][]) ]
|
||||||
user: MUserId | null
|
user: MUserId | null
|
||||||
}): Promise<void> {
|
}): Promise<void> {
|
||||||
const { video, payloads, user } = options
|
const { video, payloads, user } = options
|
||||||
|
|
||||||
|
const priority = await getTranscodingJobPriority({ user, type: 'vod', fallback: undefined })
|
||||||
|
|
||||||
const parent = payloads[0][0]
|
const parent = payloads[0][0]
|
||||||
payloads.shift()
|
payloads.shift()
|
||||||
|
|
||||||
const nextTranscodingSequentialJobs = await Bluebird.mapSeries(payloads, p => {
|
const nextTranscodingSequentialJobs = payloads.map(p => {
|
||||||
return Bluebird.mapSeries(p, payload => {
|
return p.map(payload => {
|
||||||
return this.buildTranscodingJob({ payload, user })
|
return this.buildTranscodingJob({
|
||||||
|
payload,
|
||||||
|
priority: payload.higherPriority ? priority - 1 : priority
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -44,7 +51,11 @@ export class TranscodingJobQueueBuilder extends AbstractJobBuilder <Payload> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const parentJob = await this.buildTranscodingJob({ payload: parent, user, hasChildren: payloads.length !== 0 })
|
const parentJob = this.buildTranscodingJob({
|
||||||
|
payload: parent,
|
||||||
|
priority: parent.higherPriority ? priority - 1 : priority,
|
||||||
|
hasChildren: payloads.length !== 0
|
||||||
|
})
|
||||||
|
|
||||||
await JobQueue.Instance.createSequentialJobFlow(parentJob, transcodingJobBuilderJob)
|
await JobQueue.Instance.createSequentialJobFlow(parentJob, transcodingJobBuilderJob)
|
||||||
|
|
||||||
|
@ -52,16 +63,16 @@ export class TranscodingJobQueueBuilder extends AbstractJobBuilder <Payload> {
|
||||||
await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
|
await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
|
||||||
}
|
}
|
||||||
|
|
||||||
private async buildTranscodingJob (options: {
|
private buildTranscodingJob (options: {
|
||||||
payload: VideoTranscodingPayload
|
payload: VideoTranscodingPayload
|
||||||
hasChildren?: boolean
|
hasChildren?: boolean
|
||||||
user: MUserId | null // null means we don't want priority
|
priority: number
|
||||||
}) {
|
}) {
|
||||||
const { user, payload, hasChildren = false } = options
|
const { priority, payload, hasChildren = false } = options
|
||||||
|
|
||||||
return {
|
return {
|
||||||
type: 'video-transcoding' as 'video-transcoding',
|
type: 'video-transcoding' as 'video-transcoding',
|
||||||
priority: await getTranscodingJobPriority({ user, type: 'vod', fallback: undefined }),
|
priority,
|
||||||
payload: { ...payload, hasChildren }
|
payload: { ...payload, hasChildren }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,12 +26,14 @@ type Payload = {
|
||||||
options: Omit<Parameters<VODWebVideoTranscodingJobHandler['create']>[0], 'priority'>
|
options: Omit<Parameters<VODWebVideoTranscodingJobHandler['create']>[0], 'priority'>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PayloadWithPriority = Payload & { higherPriority?: boolean }
|
||||||
|
|
||||||
// eslint-disable-next-line max-len
|
// eslint-disable-next-line max-len
|
||||||
export class TranscodingRunnerJobBuilder extends AbstractJobBuilder <Payload> {
|
export class TranscodingRunnerJobBuilder extends AbstractJobBuilder <Payload> {
|
||||||
|
|
||||||
protected async createJobs (options: {
|
protected async createJobs (options: {
|
||||||
video: MVideo
|
video: MVideo
|
||||||
payloads: [ [ Payload ], ...(Payload[][]) ] // Array of sequential jobs to create that depend on parent job
|
payloads: [ [ PayloadWithPriority ], ...(PayloadWithPriority[][]) ] // Array of sequential jobs to create that depend on parent job
|
||||||
user: MUserId | null
|
user: MUserId | null
|
||||||
}): Promise<void> {
|
}): Promise<void> {
|
||||||
const { payloads, user } = options
|
const { payloads, user } = options
|
||||||
|
@ -39,7 +41,12 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder <Payload> {
|
||||||
const parent = payloads[0][0]
|
const parent = payloads[0][0]
|
||||||
payloads.shift()
|
payloads.shift()
|
||||||
|
|
||||||
const parentJob = await this.createJob({ payload: parent, user })
|
const priority = await getTranscodingJobPriority({ user, type: 'vod', fallback: 0 })
|
||||||
|
|
||||||
|
const parentJob = await this.createJob({
|
||||||
|
payload: parent,
|
||||||
|
priority: parent.higherPriority ? priority - 1 : priority
|
||||||
|
})
|
||||||
|
|
||||||
for (const parallelPayloads of payloads) {
|
for (const parallelPayloads of payloads) {
|
||||||
let lastJob = parentJob
|
let lastJob = parentJob
|
||||||
|
@ -47,8 +54,8 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder <Payload> {
|
||||||
for (const parallelPayload of parallelPayloads) {
|
for (const parallelPayload of parallelPayloads) {
|
||||||
lastJob = await this.createJob({
|
lastJob = await this.createJob({
|
||||||
payload: parallelPayload,
|
payload: parallelPayload,
|
||||||
dependsOnRunnerJob: lastJob,
|
priority: parallelPayload.higherPriority ? priority - 1 : priority,
|
||||||
user
|
dependsOnRunnerJob: lastJob
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,10 +65,10 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder <Payload> {
|
||||||
|
|
||||||
private async createJob (options: {
|
private async createJob (options: {
|
||||||
payload: Payload
|
payload: Payload
|
||||||
user: MUserId | null
|
priority: number
|
||||||
dependsOnRunnerJob?: MRunnerJob
|
dependsOnRunnerJob?: MRunnerJob
|
||||||
}) {
|
}) {
|
||||||
const { dependsOnRunnerJob, payload, user } = options
|
const { dependsOnRunnerJob, payload, priority } = options
|
||||||
|
|
||||||
const builder = new payload.Builder()
|
const builder = new payload.Builder()
|
||||||
|
|
||||||
|
@ -69,7 +76,7 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder <Payload> {
|
||||||
...(payload.options as any), // FIXME: typings
|
...(payload.options as any), // FIXME: typings
|
||||||
|
|
||||||
dependsOnRunnerJob,
|
dependsOnRunnerJob,
|
||||||
priority: await getTranscodingJobPriority({ user, type: 'vod', fallback: 0 })
|
priority
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,8 +126,6 @@ export class VideoViewerCounters {
|
||||||
getTotalViewersOf (video: MVideoImmutable) {
|
getTotalViewersOf (video: MVideoImmutable) {
|
||||||
const viewers = this.viewersPerVideo.get(video.id)
|
const viewers = this.viewersPerVideo.get(video.id)
|
||||||
|
|
||||||
logger.error('toto', { viewers })
|
|
||||||
|
|
||||||
return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0
|
return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue