Try to fix live segments check
This commit is contained in:
parent
52a350a15c
commit
210856a7be
|
@ -95,7 +95,7 @@ export class EditCustomConfigComponent extends FormReactive implements OnInit, A
|
||||||
]
|
]
|
||||||
|
|
||||||
this.liveMaxDurationOptions = [
|
this.liveMaxDurationOptions = [
|
||||||
{ value: 0, label: $localize`No limit` },
|
{ value: null, label: $localize`No limit` },
|
||||||
{ value: 1000 * 3600, label: $localize`1 hour` },
|
{ value: 1000 * 3600, label: $localize`1 hour` },
|
||||||
{ value: 1000 * 3600 * 3, label: $localize`3 hours` },
|
{ value: 1000 * 3600 * 3, label: $localize`3 hours` },
|
||||||
{ value: 1000 * 3600 * 5, label: $localize`5 hours` },
|
{ value: 1000 * 3600 * 5, label: $localize`5 hours` },
|
||||||
|
@ -328,7 +328,13 @@ export class EditCustomConfigComponent extends FormReactive implements OnInit, A
|
||||||
}
|
}
|
||||||
|
|
||||||
async formValidated () {
|
async formValidated () {
|
||||||
this.configService.updateCustomConfig(this.form.getRawValue())
|
const value: CustomConfig = this.form.getRawValue()
|
||||||
|
|
||||||
|
// Transform "null" to null
|
||||||
|
const maxDuration = value.live.maxDuration as any
|
||||||
|
if (maxDuration === 'null') value.live.maxDuration = null
|
||||||
|
|
||||||
|
this.configService.updateCustomConfig(value)
|
||||||
.subscribe(
|
.subscribe(
|
||||||
res => {
|
res => {
|
||||||
this.customConfig = res
|
this.customConfig = res
|
||||||
|
|
|
@ -1,27 +1,32 @@
|
||||||
|
import { wait } from '@root-helpers/utils'
|
||||||
import { Segment } from 'p2p-media-loader-core'
|
import { Segment } from 'p2p-media-loader-core'
|
||||||
import { basename } from 'path'
|
import { basename } from 'path'
|
||||||
|
|
||||||
type SegmentsJSON = { [filename: string]: string | { [byterange: string]: string } }
|
type SegmentsJSON = { [filename: string]: string | { [byterange: string]: string } }
|
||||||
|
|
||||||
|
const maxRetries = 3
|
||||||
|
|
||||||
function segmentValidatorFactory (segmentsSha256Url: string) {
|
function segmentValidatorFactory (segmentsSha256Url: string) {
|
||||||
let segmentsJSON = fetchSha256Segments(segmentsSha256Url)
|
let segmentsJSON = fetchSha256Segments(segmentsSha256Url)
|
||||||
const regex = /bytes=(\d+)-(\d+)/
|
const regex = /bytes=(\d+)-(\d+)/
|
||||||
|
|
||||||
return async function segmentValidator (segment: Segment, canRefetchSegmentHashes = true) {
|
return async function segmentValidator (segment: Segment, retry = 1) {
|
||||||
const filename = basename(segment.url)
|
const filename = basename(segment.url)
|
||||||
|
|
||||||
const segmentValue = (await segmentsJSON)[filename]
|
const segmentValue = (await segmentsJSON)[filename]
|
||||||
|
|
||||||
if (!segmentValue && !canRefetchSegmentHashes) {
|
if (!segmentValue && retry > maxRetries) {
|
||||||
throw new Error(`Unknown segment name ${filename} in segment validator`)
|
throw new Error(`Unknown segment name ${filename} in segment validator`)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!segmentValue) {
|
if (!segmentValue) {
|
||||||
console.log('Refetching sha segments.')
|
await wait(1000)
|
||||||
|
|
||||||
|
console.log('Refetching sha segments for %s.', filename)
|
||||||
|
|
||||||
// Refetch
|
|
||||||
segmentsJSON = fetchSha256Segments(segmentsSha256Url)
|
segmentsJSON = fetchSha256Segments(segmentsSha256Url)
|
||||||
segmentValidator(segment, false)
|
await segmentValidator(segment, retry + 1)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,14 @@ function importModule (path: string) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function wait (ms: number) {
|
||||||
|
return new Promise(res => {
|
||||||
|
setTimeout(() => res(), ms)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
importModule,
|
importModule,
|
||||||
objectToUrlEncoded
|
objectToUrlEncoded,
|
||||||
|
wait
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
|
|
||||||
import { AsyncQueue, queue } from 'async'
|
|
||||||
import * as chokidar from 'chokidar'
|
import * as chokidar from 'chokidar'
|
||||||
import { FfmpegCommand } from 'fluent-ffmpeg'
|
import { FfmpegCommand } from 'fluent-ffmpeg'
|
||||||
import { ensureDir, stat } from 'fs-extra'
|
import { ensureDir, stat } from 'fs-extra'
|
||||||
|
@ -50,12 +49,6 @@ const config = {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type SegmentSha256QueueParam = {
|
|
||||||
operation: 'update' | 'delete'
|
|
||||||
videoUUID: string
|
|
||||||
segmentPath: string
|
|
||||||
}
|
|
||||||
|
|
||||||
class LiveManager {
|
class LiveManager {
|
||||||
|
|
||||||
private static instance: LiveManager
|
private static instance: LiveManager
|
||||||
|
@ -71,7 +64,6 @@ class LiveManager {
|
||||||
return isAbleToUploadVideo(userId, 1000)
|
return isAbleToUploadVideo(userId, 1000)
|
||||||
}, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
|
}, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
|
||||||
|
|
||||||
private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
|
|
||||||
private rtmpServer: any
|
private rtmpServer: any
|
||||||
|
|
||||||
private constructor () {
|
private constructor () {
|
||||||
|
@ -96,18 +88,6 @@ class LiveManager {
|
||||||
logger.info('Live session ended.', { sessionId })
|
logger.info('Live session ended.', { sessionId })
|
||||||
})
|
})
|
||||||
|
|
||||||
this.segmentsSha256Queue = queue<SegmentSha256QueueParam, Error>((options, cb) => {
|
|
||||||
const promise = options.operation === 'update'
|
|
||||||
? this.addSegmentSha(options)
|
|
||||||
: Promise.resolve(this.removeSegmentSha(options))
|
|
||||||
|
|
||||||
promise.then(() => cb())
|
|
||||||
.catch(err => {
|
|
||||||
logger.error('Cannot update/remove sha segment %s.', options.segmentPath, { err })
|
|
||||||
cb()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
registerConfigChangedHandler(() => {
|
registerConfigChangedHandler(() => {
|
||||||
if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
|
if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
|
||||||
this.run()
|
this.run()
|
||||||
|
@ -294,11 +274,18 @@ class LiveManager {
|
||||||
|
|
||||||
const tsWatcher = chokidar.watch(outPath + '/*.ts')
|
const tsWatcher = chokidar.watch(outPath + '/*.ts')
|
||||||
|
|
||||||
const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
|
let segmentsToProcess: string[] = []
|
||||||
|
|
||||||
const addHandler = segmentPath => {
|
const addHandler = segmentPath => {
|
||||||
updateSegment(segmentPath)
|
// Add sha hash of previous segments, because ffmpeg should have finished generating them
|
||||||
|
for (const previousSegment of segmentsToProcess) {
|
||||||
|
this.addSegmentSha(videoUUID, previousSegment)
|
||||||
|
.catch(err => logger.error('Cannot add sha segment of video %s -> %s.', videoUUID, previousSegment, { err }))
|
||||||
|
}
|
||||||
|
|
||||||
|
segmentsToProcess = [ segmentPath ]
|
||||||
|
|
||||||
|
// Duration constraint check
|
||||||
if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
|
if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
|
||||||
logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
|
logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
|
||||||
|
|
||||||
|
@ -323,10 +310,9 @@ class LiveManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
|
const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
|
||||||
|
|
||||||
tsWatcher.on('add', p => addHandler(p))
|
tsWatcher.on('add', p => addHandler(p))
|
||||||
tsWatcher.on('change', p => updateSegment(p))
|
|
||||||
tsWatcher.on('unlink', p => deleteHandler(p))
|
tsWatcher.on('unlink', p => deleteHandler(p))
|
||||||
|
|
||||||
const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
|
const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
|
||||||
|
@ -399,33 +385,33 @@ class LiveManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async addSegmentSha (options: SegmentSha256QueueParam) {
|
private async addSegmentSha (videoUUID: string, segmentPath: string) {
|
||||||
const segmentName = basename(options.segmentPath)
|
const segmentName = basename(segmentPath)
|
||||||
logger.debug('Updating live sha segment %s.', options.segmentPath)
|
logger.debug('Adding live sha segment %s.', segmentPath)
|
||||||
|
|
||||||
const shaResult = await buildSha256Segment(options.segmentPath)
|
const shaResult = await buildSha256Segment(segmentPath)
|
||||||
|
|
||||||
if (!this.segmentsSha256.has(options.videoUUID)) {
|
if (!this.segmentsSha256.has(videoUUID)) {
|
||||||
this.segmentsSha256.set(options.videoUUID, new Map())
|
this.segmentsSha256.set(videoUUID, new Map())
|
||||||
}
|
}
|
||||||
|
|
||||||
const filesMap = this.segmentsSha256.get(options.videoUUID)
|
const filesMap = this.segmentsSha256.get(videoUUID)
|
||||||
filesMap.set(segmentName, shaResult)
|
filesMap.set(segmentName, shaResult)
|
||||||
}
|
}
|
||||||
|
|
||||||
private removeSegmentSha (options: SegmentSha256QueueParam) {
|
private removeSegmentSha (videoUUID: string, segmentPath: string) {
|
||||||
const segmentName = basename(options.segmentPath)
|
const segmentName = basename(segmentPath)
|
||||||
|
|
||||||
logger.debug('Removing live sha segment %s.', options.segmentPath)
|
logger.debug('Removing live sha segment %s.', segmentPath)
|
||||||
|
|
||||||
const filesMap = this.segmentsSha256.get(options.videoUUID)
|
const filesMap = this.segmentsSha256.get(videoUUID)
|
||||||
if (!filesMap) {
|
if (!filesMap) {
|
||||||
logger.warn('Unknown files map to remove sha for %s.', options.videoUUID)
|
logger.warn('Unknown files map to remove sha for %s.', videoUUID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!filesMap.has(segmentName)) {
|
if (!filesMap.has(segmentName)) {
|
||||||
logger.warn('Unknown segment in files map for video %s and segment %s.', options.videoUUID, options.segmentPath)
|
logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue