Fix peertube runner concurrency
This commit is contained in:
parent
ef2e6aabf7
commit
fe7019b232
|
@ -170,18 +170,29 @@ export class RunnerServer {
|
||||||
private async checkAvailableJobs () {
|
private async checkAvailableJobs () {
|
||||||
if (this.checkingAvailableJobs) return
|
if (this.checkingAvailableJobs) return
|
||||||
|
|
||||||
logger.info('Checking available jobs')
|
|
||||||
|
|
||||||
this.checkingAvailableJobs = true
|
this.checkingAvailableJobs = true
|
||||||
|
|
||||||
|
let hadAvailableJob = false
|
||||||
|
|
||||||
for (const server of this.servers) {
|
for (const server of this.servers) {
|
||||||
try {
|
try {
|
||||||
|
logger.info('Checking available jobs on ' + server.url)
|
||||||
|
|
||||||
const job = await this.requestJob(server)
|
const job = await this.requestJob(server)
|
||||||
if (!job) continue
|
if (!job) continue
|
||||||
|
|
||||||
|
hadAvailableJob = true
|
||||||
|
|
||||||
await this.tryToExecuteJobAsync(server, job)
|
await this.tryToExecuteJobAsync(server, job)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
|
const code = (err.res?.body as PeerTubeProblemDocument)?.code
|
||||||
|
|
||||||
|
if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
|
||||||
|
logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
|
||||||
logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
|
logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
|
||||||
|
|
||||||
await this.unregisterRunner({ url: server.url })
|
await this.unregisterRunner({ url: server.url })
|
||||||
|
@ -193,6 +204,11 @@ export class RunnerServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.checkingAvailableJobs = false
|
this.checkingAvailableJobs = false
|
||||||
|
|
||||||
|
if (hadAvailableJob && this.canProcessMoreJobs()) {
|
||||||
|
this.checkAvailableJobs()
|
||||||
|
.catch(err => logger.error({ err }, 'Cannot check more available jobs'))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async requestJob (server: PeerTubeServer) {
|
private async requestJob (server: PeerTubeServer) {
|
||||||
|
@ -211,7 +227,7 @@ export class RunnerServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
|
private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
|
||||||
if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
|
if (!this.canProcessMoreJobs()) return
|
||||||
|
|
||||||
const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
|
const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
|
||||||
|
|
||||||
|
@ -242,6 +258,10 @@ export class RunnerServer {
|
||||||
return ConfigManager.Instance.setRegisteredInstances(data)
|
return ConfigManager.Instance.setRegisteredInstances(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private canProcessMoreJobs () {
|
||||||
|
return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
private async cleanupTMP () {
|
private async cleanupTMP () {
|
||||||
|
|
|
@ -42,6 +42,7 @@ import {
|
||||||
RunnerJobType,
|
RunnerJobType,
|
||||||
RunnerJobUpdateBody,
|
RunnerJobUpdateBody,
|
||||||
RunnerJobUpdatePayload,
|
RunnerJobUpdatePayload,
|
||||||
|
ServerErrorCode,
|
||||||
UserRight,
|
UserRight,
|
||||||
VideoStudioTranscodingSuccess,
|
VideoStudioTranscodingSuccess,
|
||||||
VODAudioMergeTranscodingSuccess,
|
VODAudioMergeTranscodingSuccess,
|
||||||
|
@ -168,6 +169,7 @@ async function acceptRunnerJob (req: express.Request, res: express.Response) {
|
||||||
|
|
||||||
if (runnerJob.state !== RunnerJobState.PENDING) {
|
if (runnerJob.state !== RunnerJobState.PENDING) {
|
||||||
res.fail({
|
res.fail({
|
||||||
|
type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
|
||||||
message: 'This job is not in pending state anymore',
|
message: 'This job is not in pending state anymore',
|
||||||
status: HttpStatusCode.CONFLICT_409
|
status: HttpStatusCode.CONFLICT_409
|
||||||
})
|
})
|
||||||
|
|
|
@ -68,6 +68,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
|
||||||
abort () {
|
abort () {
|
||||||
if (this.ended || this.errored || this.aborted) return
|
if (this.ended || this.errored || this.aborted) return
|
||||||
|
|
||||||
|
logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags())
|
||||||
|
|
||||||
this.ffmpegCommand.kill('SIGINT')
|
this.ffmpegCommand.kill('SIGINT')
|
||||||
|
|
||||||
this.aborted = true
|
this.aborted = true
|
||||||
|
@ -95,6 +97,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
|
||||||
private onFFmpegEnded () {
|
private onFFmpegEnded () {
|
||||||
if (this.ended || this.errored || this.aborted) return
|
if (this.ended || this.errored || this.aborted) return
|
||||||
|
|
||||||
|
logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags())
|
||||||
|
|
||||||
this.ended = true
|
this.ended = true
|
||||||
this.emit('end')
|
this.emit('end')
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ describe('Open Telemetry', function () {
|
||||||
it('Should enable open telemetry metrics', async function () {
|
it('Should enable open telemetry metrics', async function () {
|
||||||
this.timeout(120000)
|
this.timeout(120000)
|
||||||
|
|
||||||
server = await createSingleServer(1, {
|
await server.run({
|
||||||
open_telemetry: {
|
open_telemetry: {
|
||||||
metrics: {
|
metrics: {
|
||||||
enabled: true
|
enabled: true
|
||||||
|
@ -73,7 +73,7 @@ describe('Open Telemetry', function () {
|
||||||
it('Should disable http request duration metrics', async function () {
|
it('Should disable http request duration metrics', async function () {
|
||||||
await server.kill()
|
await server.kill()
|
||||||
|
|
||||||
server = await createSingleServer(1, {
|
await server.run({
|
||||||
open_telemetry: {
|
open_telemetry: {
|
||||||
metrics: {
|
metrics: {
|
||||||
enabled: true,
|
enabled: true,
|
||||||
|
@ -114,7 +114,7 @@ describe('Open Telemetry', function () {
|
||||||
})
|
})
|
||||||
|
|
||||||
it('Should enable open telemetry metrics', async function () {
|
it('Should enable open telemetry metrics', async function () {
|
||||||
server = await createSingleServer(1, {
|
await server.run({
|
||||||
open_telemetry: {
|
open_telemetry: {
|
||||||
tracing: {
|
tracing: {
|
||||||
enabled: true,
|
enabled: true,
|
||||||
|
|
|
@ -189,7 +189,7 @@ describe('Test VOD transcoding in peertube-runner program', function () {
|
||||||
})
|
})
|
||||||
|
|
||||||
it('Should transcode videos on manual run', async function () {
|
it('Should transcode videos on manual run', async function () {
|
||||||
this.timeout(120000)
|
this.timeout(240000)
|
||||||
|
|
||||||
await servers[0].config.disableTranscoding()
|
await servers[0].config.disableTranscoding()
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ export const enum ServerErrorCode {
|
||||||
ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected',
|
ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected',
|
||||||
|
|
||||||
RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state',
|
RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state',
|
||||||
|
RUNNER_JOB_NOT_IN_PENDING_STATE = 'runner_job_not_in_pending_state',
|
||||||
UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token'
|
UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue