Implement graceful shutdown in runner

This commit is contained in:
Chocobozzz 2024-12-12 11:29:43 +01:00
parent 8e8d9f9358
commit 33a68f74dd
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
11 changed files with 193 additions and 30 deletions

View File

@ -3,6 +3,7 @@
import { Command, InvalidArgumentError } from '@commander-js/extra-typings' import { Command, InvalidArgumentError } from '@commander-js/extra-typings'
import { RunnerJobType } from '@peertube/peertube-models' import { RunnerJobType } from '@peertube/peertube-models'
import { listRegistered, registerRunner, unregisterRunner } from './register/index.js' import { listRegistered, registerRunner, unregisterRunner } from './register/index.js'
import { gracefulShutdown } from './register/shutdown.js'
import { RunnerServer } from './server/index.js' import { RunnerServer } from './server/index.js'
import { getSupportedJobsList } from './server/shared/supported-job.js' import { getSupportedJobsList } from './server/shared/supported-job.js'
import { ConfigManager, logger } from './shared/index.js' import { ConfigManager, logger } from './shared/index.js'
@ -98,6 +99,18 @@ program.command('list-registered')
} }
}) })
program.command('graceful-shutdown')
.description('Exit runner when all processing tasks are finished')
.action(async () => {
try {
await gracefulShutdown()
} catch (err) {
console.error('Cannot graceful shutdown the runner.')
console.error(err)
process.exit(-1)
}
})
program.parse() program.parse()
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -0,0 +1,10 @@
import { IPCClient } from '../shared/ipc/index.js'
export async function gracefulShutdown () {
const client = new IPCClient()
await client.run()
await client.askGracefulShutdown()
client.stop()
}

View File

@ -23,6 +23,7 @@ export class RunnerServer {
private checkingAvailableJobs = false private checkingAvailableJobs = false
private gracefulShutdown = false
private cleaningUp = false private cleaningUp = false
private initialized = false private initialized = false
@ -182,6 +183,15 @@ export class RunnerServer {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
requestGracefulShutdown () {
logger.info('Received graceful shutdown request')
this.gracefulShutdown = true
this.exitGracefullyIfNoProcessingJobs()
}
// ---------------------------------------------------------------------------
private safeAsyncCheckAvailableJobs () { private safeAsyncCheckAvailableJobs () {
this.checkAvailableJobs() this.checkAvailableJobs()
.catch(err => logger.error({ err }, `Cannot check available jobs`)) .catch(err => logger.error({ err }, `Cannot check available jobs`))
@ -190,6 +200,7 @@ export class RunnerServer {
private async checkAvailableJobs () { private async checkAvailableJobs () {
if (!this.initialized) return if (!this.initialized) return
if (this.checkingAvailableJobs) return if (this.checkingAvailableJobs) return
if (this.gracefulShutdown) return
this.checkingAvailableJobs = true this.checkingAvailableJobs = true
@ -260,9 +271,12 @@ export class RunnerServer {
private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
if (!this.canProcessMoreJobs()) { if (!this.canProcessMoreJobs()) {
logger.info( if (!this.gracefulShutdown) {
`Do not process more jobs (processing ${this.processingJobs.length} / ${ConfigManager.Instance.getConfig().jobs.concurrency})` logger.info(
) `Do not process more jobs (processing ${this.processingJobs.length} / ${ConfigManager.Instance.getConfig().jobs.concurrency})`
)
}
return return
} }
@ -281,6 +295,8 @@ export class RunnerServer {
.finally(() => { .finally(() => {
this.processingJobs = this.processingJobs.filter(p => p !== processingJob) this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
if (this.gracefulShutdown) this.exitGracefullyIfNoProcessingJobs()
return this.checkAvailableJobs() return this.checkAvailableJobs()
}) })
} }
@ -296,6 +312,9 @@ export class RunnerServer {
} }
private canProcessMoreJobs () { private canProcessMoreJobs () {
if (this.cleaningUp) return false
if (this.gracefulShutdown) return false
return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
} }
@ -309,6 +328,15 @@ export class RunnerServer {
} }
} }
private exitGracefullyIfNoProcessingJobs () {
if (this.processingJobs.length !== 0) return
logger.info('Shutting down the runner after graceful shutdown request')
this.onExit()
.catch(err => logger.error({ err }, 'Cannot exit runner'))
}
private async onExit () { private async onExit () {
if (this.cleaningUp) return if (this.cleaningUp) return
this.cleaningUp = true this.cleaningUp = true

View File

@ -2,7 +2,7 @@ import CliTable3 from 'cli-table3'
import { ensureDir } from 'fs-extra/esm' import { ensureDir } from 'fs-extra/esm'
import { Client as NetIPC } from '@peertube/net-ipc' import { Client as NetIPC } from '@peertube/net-ipc'
import { ConfigManager } from '../config-manager.js' import { ConfigManager } from '../config-manager.js'
import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js' import { IPCResponse, IPCResponseData, IPCRequest } from './shared/index.js'
export class IPCClient { export class IPCClient {
private netIPC: NetIPC private netIPC: NetIPC
@ -39,7 +39,7 @@ export class IPCClient {
...options ...options
} }
const { success, error } = await this.netIPC.request(req) as IPCReponse const { success, error } = await this.netIPC.request(req) as IPCResponse
if (success) console.log('PeerTube instance registered') if (success) console.log('PeerTube instance registered')
else console.error('Could not register PeerTube instance on runner server side', error) else console.error('Could not register PeerTube instance on runner server side', error)
@ -54,7 +54,7 @@ export class IPCClient {
...options ...options
} }
const { success, error } = await this.netIPC.request(req) as IPCReponse const { success, error } = await this.netIPC.request(req) as IPCResponse
if (success) console.log('PeerTube instance unregistered') if (success) console.log('PeerTube instance unregistered')
else console.error('Could not unregister PeerTube instance on runner server side', error) else console.error('Could not unregister PeerTube instance on runner server side', error)
@ -65,7 +65,7 @@ export class IPCClient {
type: 'list-registered' type: 'list-registered'
} }
const { success, error, data } = await this.netIPC.request(req) as IPCReponse<IPCReponseData> const { success, error, data } = await this.netIPC.request(req) as IPCResponse<IPCResponseData>
if (!success) { if (!success) {
console.error('Could not list registered PeerTube instances', error) console.error('Could not list registered PeerTube instances', error)
return return
@ -82,6 +82,19 @@ export class IPCClient {
console.log(table.toString()) console.log(table.toString())
} }
// ---------------------------------------------------------------------------
async askGracefulShutdown () {
const req: IPCRequest = { type: 'graceful-shutdown' }
const { success, error } = await this.netIPC.request(req) as IPCResponse
if (success) console.log('Graceful shutdown acknowledged by the runner')
else console.error('Could not graceful shutdown runner', error)
}
// ---------------------------------------------------------------------------
stop () { stop () {
this.netIPC.destroy() this.netIPC.destroy()
} }

View File

@ -4,7 +4,7 @@ import { pick } from '@peertube/peertube-core-utils'
import { RunnerServer } from '../../server/index.js' import { RunnerServer } from '../../server/index.js'
import { ConfigManager } from '../config-manager.js' import { ConfigManager } from '../config-manager.js'
import { logger } from '../logger.js' import { logger } from '../logger.js'
import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js' import { IPCResponse, IPCResponseData, IPCRequest } from './shared/index.js'
export class IPCServer { export class IPCServer {
private netIPC: NetIPC private netIPC: NetIPC
@ -25,10 +25,10 @@ export class IPCServer {
try { try {
const data = await this.process(req) const data = await this.process(req)
this.sendReponse(res, { success: true, data }) this.sendResponse(res, { success: true, data })
} catch (err) { } catch (err) {
logger.error('Cannot execute RPC call', err) logger.error({ err }, 'Cannot execute RPC call')
this.sendReponse(res, { success: false, error: err.message }) this.sendResponse(res, { success: false, error: err.message })
} }
}) })
} }
@ -46,14 +46,18 @@ export class IPCServer {
case 'list-registered': case 'list-registered':
return Promise.resolve(this.runnerServer.listRegistered()) return Promise.resolve(this.runnerServer.listRegistered())
case 'graceful-shutdown':
this.runnerServer.requestGracefulShutdown()
return undefined
default: default:
throw new Error('Unknown RPC call ' + (req as any).type) throw new Error('Unknown RPC call ' + (req as any).type)
} }
} }
private sendReponse <T extends IPCReponseData> ( private sendResponse <T extends IPCResponseData> (
response: (data: any) => Promise<void>, response: (data: any) => Promise<void>,
body: IPCReponse<T> body: IPCResponse<T>
) { ) {
response(body) response(body)
.catch(err => logger.error('Cannot send response after IPC request', err)) .catch(err => logger.error('Cannot send response after IPC request', err))

View File

@ -1,7 +1,8 @@
export type IPCRequest = export type IPCRequest =
IPCRequestRegister | IPCRequestRegister |
IPCRequestUnregister | IPCRequestUnregister |
IPCRequestListRegistered IPCRequestListRegistered |
IPCRequestGracefulShutdown
export type IPCRequestRegister = { export type IPCRequestRegister = {
type: 'register' type: 'register'
@ -13,3 +14,5 @@ export type IPCRequestRegister = {
export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string } export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string }
export type IPCRequestListRegistered = { type: 'list-registered' } export type IPCRequestListRegistered = { type: 'list-registered' }
export type IPCRequestGracefulShutdown = { type: 'graceful-shutdown' }

View File

@ -1,10 +1,10 @@
export type IPCReponse <T extends IPCReponseData = undefined> = { export type IPCResponse <T extends IPCResponseData = undefined> = {
success: boolean success: boolean
error?: string error?: string
data?: T data?: T
} }
export type IPCReponseData = export type IPCResponseData =
// list registered // list registered
{ {
servers: { servers: {

View File

@ -2,38 +2,30 @@ import short, { SUUID } from 'short-uuid'
const translator = short() const translator = short()
function buildUUID () { export function buildUUID () {
return short.uuid() return short.uuid()
} }
function buildSUUID (): SUUID { export function buildSUUID (): SUUID {
return short.generate() return short.generate()
} }
function uuidToShort (uuid: string) { export function uuidToShort (uuid: string) {
if (!uuid) return uuid if (!uuid) return uuid
return translator.fromUUID(uuid) return translator.fromUUID(uuid)
} }
function shortToUUID (shortUUID: string) { export function shortToUUID (shortUUID: string) {
if (!shortUUID) return shortUUID if (!shortUUID) return shortUUID
return translator.toUUID(shortUUID) return translator.toUUID(shortUUID)
} }
function isShortUUID (value: string) { export function isShortUUID (value: string) {
if (!value) return false if (!value) return false
return value.length === translator.maxLength return value.length === translator.maxLength
} }
export {
buildUUID,
buildSUUID,
uuidToShort,
shortToUUID,
isShortUUID
}
export type { SUUID } export type { SUUID }

View File

@ -1,6 +1,7 @@
export * from './client-cli.js' export * from './client-cli.js'
export * from './live-transcoding.js' export * from './live-transcoding.js'
export * from './replace-file.js' export * from './replace-file.js'
export * from './shutdown.js'
export * from './studio-transcoding.js' export * from './studio-transcoding.js'
export * from './video-transcription.js' export * from './video-transcription.js'
export * from './vod-transcoding.js' export * from './vod-transcoding.js'

View File

@ -0,0 +1,85 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import { wait } from '@peertube/peertube-core-utils'
import { RunnerJob, RunnerJobState } from '@peertube/peertube-models'
import { buildUUID } from '@peertube/peertube-node-utils'
import {
cleanupTests,
createSingleServer,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel
} from '@peertube/peertube-server-commands'
import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js'
import { expect } from 'chai'
describe('Test peertube-runner shutdown', function () {
let server: PeerTubeServer
let peertubeRunner: PeerTubeRunnerProcess
async function runRunner () {
const registrationToken = await server.runnerRegistrationTokens.getFirstRegistrationToken()
peertubeRunner = new PeerTubeRunnerProcess(server)
await peertubeRunner.runServer()
await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: buildUUID() })
}
before(async function () {
this.timeout(120_000)
server = await createSingleServer(1)
await setAccessTokensToServers([ server ])
await setDefaultVideoChannel([ server ])
await server.config.enableTranscoding()
await server.config.enableRemoteTranscoding()
await runRunner()
})
it('Should graceful shutdown the runner when it has no processing jobs', async function () {
await peertubeRunner.gracefulShutdown()
while (!peertubeRunner.hasCorrectlyExited()) {
await wait(500)
}
})
it('Should graceful shutdown the runner with many jobs to process', async function () {
await runRunner()
await server.videos.quickUpload({ name: 'video 1' })
await server.videos.quickUpload({ name: 'video 2' })
let processingJobs: RunnerJob[] = []
while (processingJobs.length === 0) {
await wait(500)
const { data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PROCESSING ] })
processingJobs = data
}
await peertubeRunner.gracefulShutdown()
while (!peertubeRunner.hasCorrectlyExited()) {
await wait(500)
}
// Check processed jobs are finished
const { data } = await server.runnerJobs.list({ count: 50 })
for (const job of processingJobs) {
expect(data.find(j => j.uuid === job.uuid).state.id).to.equal(RunnerJobState.COMPLETED)
}
// Check there are remaining jobs to process
const { data: pendingJobs } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] })
expect(pendingJobs).to.not.have.lengthOf(0)
})
after(async function () {
peertubeRunner.kill()
await cleanupTests([ server ])
})
})

View File

@ -89,14 +89,28 @@ export class PeerTubeRunnerProcess {
return stdout return stdout
} }
// ---------------------------------------------------------------------------
gracefulShutdown () {
const args = [ 'graceful-shutdown', ...this.buildIdArg() ]
return this.runCommand(this.getRunnerPath(), args)
}
hasCorrectlyExited () {
return this.app.exitCode === 0
}
kill () { kill () {
if (!this.app) return if (!this.app || this.app.exitCode !== null) return
process.kill(this.app.pid) process.kill(this.app.pid)
this.app = null this.app = null
} }
// ---------------------------------------------------------------------------
getId () { getId () {
return 'test-' + this.server.internalServerNumber return 'test-' + this.server.internalServerNumber
} }