From 33a68f74dd6d0f18e459511a68019c77492afc4e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 12 Dec 2024 11:29:43 +0100 Subject: [PATCH] Implement graceful shutdown in runner --- apps/peertube-runner/src/peertube-runner.ts | 13 +++ apps/peertube-runner/src/register/shutdown.ts | 10 +++ apps/peertube-runner/src/server/server.ts | 34 +++++++- .../src/shared/ipc/ipc-client.ts | 21 ++++- .../src/shared/ipc/ipc-server.ts | 16 ++-- .../shared/ipc/shared/ipc-request.model.ts | 5 +- .../shared/ipc/shared/ipc-response.model.ts | 4 +- packages/node-utils/src/uuid.ts | 18 ++-- packages/tests/src/peertube-runner/index.ts | 1 + .../tests/src/peertube-runner/shutdown.ts | 85 +++++++++++++++++++ .../src/shared/peertube-runner-process.ts | 16 +++- 11 files changed, 193 insertions(+), 30 deletions(-) create mode 100644 apps/peertube-runner/src/register/shutdown.ts create mode 100644 packages/tests/src/peertube-runner/shutdown.ts diff --git a/apps/peertube-runner/src/peertube-runner.ts b/apps/peertube-runner/src/peertube-runner.ts index 45787a4b2..25349cd20 100644 --- a/apps/peertube-runner/src/peertube-runner.ts +++ b/apps/peertube-runner/src/peertube-runner.ts @@ -3,6 +3,7 @@ import { Command, InvalidArgumentError } from '@commander-js/extra-typings' import { RunnerJobType } from '@peertube/peertube-models' import { listRegistered, registerRunner, unregisterRunner } from './register/index.js' +import { gracefulShutdown } from './register/shutdown.js' import { RunnerServer } from './server/index.js' import { getSupportedJobsList } from './server/shared/supported-job.js' import { ConfigManager, logger } from './shared/index.js' @@ -98,6 +99,18 @@ program.command('list-registered') } }) +program.command('graceful-shutdown') + .description('Exit runner when all processing tasks are finished') + .action(async () => { + try { + await gracefulShutdown() + } catch (err) { + console.error('Cannot graceful shutdown the runner.') + console.error(err) + process.exit(-1) + } + }) + program.parse() // --------------------------------------------------------------------------- diff --git a/apps/peertube-runner/src/register/shutdown.ts b/apps/peertube-runner/src/register/shutdown.ts new file mode 100644 index 000000000..bd9e33720 --- /dev/null +++ b/apps/peertube-runner/src/register/shutdown.ts @@ -0,0 +1,10 @@ +import { IPCClient } from '../shared/ipc/index.js' + +export async function gracefulShutdown () { + const client = new IPCClient() + await client.run() + + await client.askGracefulShutdown() + + client.stop() +} diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts index 56239d6f7..6c58915ef 100644 --- a/apps/peertube-runner/src/server/server.ts +++ b/apps/peertube-runner/src/server/server.ts @@ -23,6 +23,7 @@ export class RunnerServer { private checkingAvailableJobs = false + private gracefulShutdown = false private cleaningUp = false private initialized = false @@ -182,6 +183,15 @@ export class RunnerServer { // --------------------------------------------------------------------------- + requestGracefulShutdown () { + logger.info('Received graceful shutdown request') + + this.gracefulShutdown = true + this.exitGracefullyIfNoProcessingJobs() + } + + // --------------------------------------------------------------------------- + private safeAsyncCheckAvailableJobs () { this.checkAvailableJobs() .catch(err => logger.error({ err }, `Cannot check available jobs`)) @@ -190,6 +200,7 @@ export class RunnerServer { private async checkAvailableJobs () { if (!this.initialized) return if (this.checkingAvailableJobs) return + if (this.gracefulShutdown) return this.checkingAvailableJobs = true @@ -260,9 +271,12 @@ export class RunnerServer { private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { if (!this.canProcessMoreJobs()) { - logger.info( - `Do not process more jobs (processing ${this.processingJobs.length} / ${ConfigManager.Instance.getConfig().jobs.concurrency})` - ) + if (!this.gracefulShutdown) { + logger.info( + `Do not process more jobs (processing ${this.processingJobs.length} / ${ConfigManager.Instance.getConfig().jobs.concurrency})` + ) + } + return } @@ -281,6 +295,8 @@ export class RunnerServer { .finally(() => { this.processingJobs = this.processingJobs.filter(p => p !== processingJob) + if (this.gracefulShutdown) this.exitGracefullyIfNoProcessingJobs() + return this.checkAvailableJobs() }) } @@ -296,6 +312,9 @@ export class RunnerServer { } private canProcessMoreJobs () { + if (this.cleaningUp) return false + if (this.gracefulShutdown) return false + return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency } @@ -309,6 +328,15 @@ export class RunnerServer { } } + private exitGracefullyIfNoProcessingJobs () { + if (this.processingJobs.length !== 0) return + + logger.info('Shutting down the runner after graceful shutdown request') + + this.onExit() + .catch(err => logger.error({ err }, 'Cannot exit runner')) + } + private async onExit () { if (this.cleaningUp) return this.cleaningUp = true diff --git a/apps/peertube-runner/src/shared/ipc/ipc-client.ts b/apps/peertube-runner/src/shared/ipc/ipc-client.ts index e418323a4..90575dbb4 100644 --- a/apps/peertube-runner/src/shared/ipc/ipc-client.ts +++ b/apps/peertube-runner/src/shared/ipc/ipc-client.ts @@ -2,7 +2,7 @@ import CliTable3 from 'cli-table3' import { ensureDir } from 'fs-extra/esm' import { Client as NetIPC } from '@peertube/net-ipc' import { ConfigManager } from '../config-manager.js' -import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js' +import { IPCResponse, IPCResponseData, IPCRequest } from './shared/index.js' export class IPCClient { private netIPC: NetIPC @@ -39,7 +39,7 @@ export class IPCClient { ...options } - const { success, error } = await this.netIPC.request(req) as IPCReponse + const { success, error } = await this.netIPC.request(req) as IPCResponse if (success) console.log('PeerTube instance registered') else console.error('Could not register PeerTube instance on runner server side', error) @@ -54,7 +54,7 @@ export class IPCClient { ...options } - const { success, error } = await this.netIPC.request(req) as IPCReponse + const { success, error } = await this.netIPC.request(req) as IPCResponse if (success) console.log('PeerTube instance unregistered') else console.error('Could not unregister PeerTube instance on runner server side', error) @@ -65,7 +65,7 @@ export class IPCClient { type: 'list-registered' } - const { success, error, data } = await this.netIPC.request(req) as IPCReponse + const { success, error, data } = await this.netIPC.request(req) as IPCResponse if (!success) { console.error('Could not list registered PeerTube instances', error) return @@ -82,6 +82,19 @@ export class IPCClient { console.log(table.toString()) } + // --------------------------------------------------------------------------- + + async askGracefulShutdown () { + const req: IPCRequest = { type: 'graceful-shutdown' } + + const { success, error } = await this.netIPC.request(req) as IPCResponse + + if (success) console.log('Graceful shutdown acknowledged by the runner') + else console.error('Could not graceful shutdown runner', error) + } + + // --------------------------------------------------------------------------- + stop () { this.netIPC.destroy() } diff --git a/apps/peertube-runner/src/shared/ipc/ipc-server.ts b/apps/peertube-runner/src/shared/ipc/ipc-server.ts index 4a4663329..ff03d5aef 100644 --- a/apps/peertube-runner/src/shared/ipc/ipc-server.ts +++ b/apps/peertube-runner/src/shared/ipc/ipc-server.ts @@ -4,7 +4,7 @@ import { pick } from '@peertube/peertube-core-utils' import { RunnerServer } from '../../server/index.js' import { ConfigManager } from '../config-manager.js' import { logger } from '../logger.js' -import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js' +import { IPCResponse, IPCResponseData, IPCRequest } from './shared/index.js' export class IPCServer { private netIPC: NetIPC @@ -25,10 +25,10 @@ export class IPCServer { try { const data = await this.process(req) - this.sendReponse(res, { success: true, data }) + this.sendResponse(res, { success: true, data }) } catch (err) { - logger.error('Cannot execute RPC call', err) - this.sendReponse(res, { success: false, error: err.message }) + logger.error({ err }, 'Cannot execute RPC call') + this.sendResponse(res, { success: false, error: err.message }) } }) } @@ -46,14 +46,18 @@ export class IPCServer { case 'list-registered': return Promise.resolve(this.runnerServer.listRegistered()) + case 'graceful-shutdown': + this.runnerServer.requestGracefulShutdown() + return undefined + default: throw new Error('Unknown RPC call ' + (req as any).type) } } - private sendReponse ( + private sendResponse ( response: (data: any) => Promise, - body: IPCReponse + body: IPCResponse ) { response(body) .catch(err => logger.error('Cannot send response after IPC request', err)) diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts index 352808c74..2ea1b71b2 100644 --- a/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts +++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts @@ -1,7 +1,8 @@ export type IPCRequest = IPCRequestRegister | IPCRequestUnregister | - IPCRequestListRegistered + IPCRequestListRegistered | + IPCRequestGracefulShutdown export type IPCRequestRegister = { type: 'register' @@ -13,3 +14,5 @@ export type IPCRequestRegister = { export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string } export type IPCRequestListRegistered = { type: 'list-registered' } + +export type IPCRequestGracefulShutdown = { type: 'graceful-shutdown' } diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts index 689d6e09a..475586778 100644 --- a/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts +++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts @@ -1,10 +1,10 @@ -export type IPCReponse = { +export type IPCResponse = { success: boolean error?: string data?: T } -export type IPCReponseData = +export type IPCResponseData = // list registered { servers: { diff --git a/packages/node-utils/src/uuid.ts b/packages/node-utils/src/uuid.ts index 68110eb0e..4eb5c49d7 100644 --- a/packages/node-utils/src/uuid.ts +++ b/packages/node-utils/src/uuid.ts @@ -2,38 +2,30 @@ import short, { SUUID } from 'short-uuid' const translator = short() -function buildUUID () { +export function buildUUID () { return short.uuid() } -function buildSUUID (): SUUID { +export function buildSUUID (): SUUID { return short.generate() } -function uuidToShort (uuid: string) { +export function uuidToShort (uuid: string) { if (!uuid) return uuid return translator.fromUUID(uuid) } -function shortToUUID (shortUUID: string) { +export function shortToUUID (shortUUID: string) { if (!shortUUID) return shortUUID return translator.toUUID(shortUUID) } -function isShortUUID (value: string) { +export function isShortUUID (value: string) { if (!value) return false return value.length === translator.maxLength } -export { - buildUUID, - buildSUUID, - uuidToShort, - shortToUUID, - isShortUUID -} - export type { SUUID } diff --git a/packages/tests/src/peertube-runner/index.ts b/packages/tests/src/peertube-runner/index.ts index 7807f8479..c046e1ed2 100644 --- a/packages/tests/src/peertube-runner/index.ts +++ b/packages/tests/src/peertube-runner/index.ts @@ -1,6 +1,7 @@ export * from './client-cli.js' export * from './live-transcoding.js' export * from './replace-file.js' +export * from './shutdown.js' export * from './studio-transcoding.js' export * from './video-transcription.js' export * from './vod-transcoding.js' diff --git a/packages/tests/src/peertube-runner/shutdown.ts b/packages/tests/src/peertube-runner/shutdown.ts new file mode 100644 index 000000000..0101e918f --- /dev/null +++ b/packages/tests/src/peertube-runner/shutdown.ts @@ -0,0 +1,85 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import { wait } from '@peertube/peertube-core-utils' +import { RunnerJob, RunnerJobState } from '@peertube/peertube-models' +import { buildUUID } from '@peertube/peertube-node-utils' +import { + cleanupTests, + createSingleServer, + PeerTubeServer, + setAccessTokensToServers, + setDefaultVideoChannel +} from '@peertube/peertube-server-commands' +import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js' +import { expect } from 'chai' + +describe('Test peertube-runner shutdown', function () { + let server: PeerTubeServer + let peertubeRunner: PeerTubeRunnerProcess + + async function runRunner () { + const registrationToken = await server.runnerRegistrationTokens.getFirstRegistrationToken() + + peertubeRunner = new PeerTubeRunnerProcess(server) + await peertubeRunner.runServer() + await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: buildUUID() }) + } + + before(async function () { + this.timeout(120_000) + + server = await createSingleServer(1) + + await setAccessTokensToServers([ server ]) + await setDefaultVideoChannel([ server ]) + + await server.config.enableTranscoding() + await server.config.enableRemoteTranscoding() + await runRunner() + }) + + it('Should graceful shutdown the runner when it has no processing jobs', async function () { + await peertubeRunner.gracefulShutdown() + + while (!peertubeRunner.hasCorrectlyExited()) { + await wait(500) + } + }) + + it('Should graceful shutdown the runner with many jobs to process', async function () { + await runRunner() + + await server.videos.quickUpload({ name: 'video 1' }) + await server.videos.quickUpload({ name: 'video 2' }) + + let processingJobs: RunnerJob[] = [] + while (processingJobs.length === 0) { + await wait(500) + + const { data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PROCESSING ] }) + processingJobs = data + } + + await peertubeRunner.gracefulShutdown() + + while (!peertubeRunner.hasCorrectlyExited()) { + await wait(500) + } + + // Check processed jobs are finished + const { data } = await server.runnerJobs.list({ count: 50 }) + for (const job of processingJobs) { + expect(data.find(j => j.uuid === job.uuid).state.id).to.equal(RunnerJobState.COMPLETED) + } + + // Check there are remaining jobs to process + const { data: pendingJobs } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] }) + expect(pendingJobs).to.not.have.lengthOf(0) + }) + + after(async function () { + peertubeRunner.kill() + + await cleanupTests([ server ]) + }) +}) diff --git a/packages/tests/src/shared/peertube-runner-process.ts b/packages/tests/src/shared/peertube-runner-process.ts index b4ba27d2f..37ea350fd 100644 --- a/packages/tests/src/shared/peertube-runner-process.ts +++ b/packages/tests/src/shared/peertube-runner-process.ts @@ -89,14 +89,28 @@ export class PeerTubeRunnerProcess { return stdout } + // --------------------------------------------------------------------------- + + gracefulShutdown () { + const args = [ 'graceful-shutdown', ...this.buildIdArg() ] + + return this.runCommand(this.getRunnerPath(), args) + } + + hasCorrectlyExited () { + return this.app.exitCode === 0 + } + kill () { - if (!this.app) return + if (!this.app || this.app.exitCode !== null) return process.kill(this.app.pid) this.app = null } + // --------------------------------------------------------------------------- + getId () { return 'test-' + this.server.internalServerNumber }