diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 678b0674b..2288bcd3f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -77,7 +77,7 @@ jobs: - name: Run Test # external-plugins tests only run on schedule - if: github.event_name == 'schedule' || matrix.test_suite != 'external-plugins' + # if: github.event_name == 'schedule' || matrix.test_suite != 'external-plugins' env: AKISMET_KEY: ${{ secrets.AKISMET_KEY }} run: npm run ci -- ${{ matrix.test_suite }} diff --git a/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html b/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html index 7858b4bca..d42f600e0 100644 --- a/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html +++ b/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html @@ -84,7 +84,7 @@
{{ runnerJob.privatePayload }}
-
{{ runnerJob.error }}
+
{{ runnerJob.error }}
diff --git a/packages/peertube-runner/README.md b/packages/peertube-runner/README.md index b7cf174d5..87c3c5354 100644 --- a/packages/peertube-runner/README.md +++ b/packages/peertube-runner/README.md @@ -1 +1,23 @@ # PeerTube runner + +Runner program to execute jobs (transcoding...) of remote PeerTube instances. + +Commands below has to be run at the root of PeerTube git repository. + +## Develop + +```bash +npm run dev:peertube-runner +``` + +## Build + +```bash +npm run build:peertube-runner +``` + +## Run + +```bash +node packages/peertube-runner/dist/peertube-runner.js --help +``` diff --git a/packages/peertube-runner/peertube-runner.ts b/packages/peertube-runner/peertube-runner.ts index 6bfd9ac0f..f02526ef9 100644 --- a/packages/peertube-runner/peertube-runner.ts +++ b/packages/peertube-runner/peertube-runner.ts @@ -26,7 +26,7 @@ program.command('server') try { await RunnerServer.Instance.run() } catch (err) { - console.error('Cannot run PeerTube runner as server mode', err) + logger.error('Cannot run PeerTube runner as server mode', err) process.exit(-1) } }) @@ -41,7 +41,7 @@ program.command('register') try { await registerRunner(options) } catch (err) { - console.error('Cannot register this PeerTube runner.', err) + logger.error('Cannot register this PeerTube runner.', err) process.exit(-1) } }) @@ -53,7 +53,7 @@ program.command('unregister') try { await unregisterRunner(options) } catch (err) { - console.error('Cannot unregister this PeerTube runner.', err) + logger.error('Cannot unregister this PeerTube runner.', err) process.exit(-1) } }) @@ -64,7 +64,7 @@ program.command('list-registered') try { await listRegistered() } catch (err) { - console.error('Cannot list registered PeerTube instances.', err) + logger.error('Cannot list registered PeerTube instances.', err) process.exit(-1) } }) diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts index 5a3b596a2..b17b51c7c 100644 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ b/packages/peertube-runner/server/process/shared/process-live.ts @@ -204,8 +204,8 @@ export class ProcessLiveRTMPHLSTranscoding { // --------------------------------------------------------------------------- - private sendDeletedChunkUpdate (deletedChunk: string) { - if (this.ended) return + private sendDeletedChunkUpdate (deletedChunk: string): Promise { + if (this.ended) return Promise.resolve() logger.debug(`Sending removed live chunk ${deletedChunk} update`) @@ -230,8 +230,8 @@ export class ProcessLiveRTMPHLSTranscoding { return this.updateWithRetry(payload) } - private sendAddedChunkUpdate (addedChunk: string) { - if (this.ended) return + private sendAddedChunkUpdate (addedChunk: string): Promise { + if (this.ended) return Promise.resolve() logger.debug(`Sending added live chunk ${addedChunk} update`) @@ -257,7 +257,7 @@ export class ProcessLiveRTMPHLSTranscoding { return this.updateWithRetry(payload) } - private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { + private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { if (this.ended || this.errored) return try { diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 724f359bd..e851dfc7c 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts @@ -23,6 +23,8 @@ export class RunnerServer { private checkingAvailableJobs = false + private cleaningUp = false + private readonly sockets = new Map() private constructor () {} @@ -45,13 +47,17 @@ export class RunnerServer { try { await ipcServer.run(this) } catch (err) { - console.error('Cannot start local socket for IPC communication', err) + logger.error('Cannot start local socket for IPC communication', err) process.exit(-1) } // Cleanup on exit for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { - process.on(code, async () => { + process.on(code, async (err, origin) => { + if (code === 'uncaughtException') { + logger.error({ err, origin }, 'uncaughtException') + } + await this.onExit() }) } @@ -244,6 +250,11 @@ export class RunnerServer { } private async onExit () { + if (this.cleaningUp) return + this.cleaningUp = true + + logger.info('Cleaning up after program exit') + try { for (const { server, job } of this.processingJobs) { await server.runnerJobs.abort({ @@ -256,7 +267,7 @@ export class RunnerServer { await this.cleanupTMP() } catch (err) { - console.error(err) + logger.error(err) process.exit(-1) } diff --git a/packages/peertube-runner/shared/http.ts b/packages/peertube-runner/shared/http.ts index d3fff70d1..df64dc168 100644 --- a/packages/peertube-runner/shared/http.ts +++ b/packages/peertube-runner/shared/http.ts @@ -47,7 +47,7 @@ export function downloadFile (options: { request.on('error', err => { remove(destination) - .catch(err => console.error(err)) + .catch(err => logger.error(err)) return rej(err) }) diff --git a/packages/peertube-runner/shared/ipc/ipc-server.ts b/packages/peertube-runner/shared/ipc/ipc-server.ts index bc340198b..922dc93e5 100644 --- a/packages/peertube-runner/shared/ipc/ipc-server.ts +++ b/packages/peertube-runner/shared/ipc/ipc-server.ts @@ -27,7 +27,7 @@ export class IPCServer { this.sendReponse(res, { success: true, data }) } catch (err) { - console.error('Cannot execute RPC call', err) + logger.error('Cannot execute RPC call', err) this.sendReponse(res, { success: false, error: err.message }) } }) @@ -56,6 +56,6 @@ export class IPCServer { body: IPCReponse ) { response(body) - .catch(err => console.error('Cannot send response after IPC request', err)) + .catch(err => logger.error('Cannot send response after IPC request', err)) } } diff --git a/scripts/build/peertube-runner.sh b/scripts/build/peertube-runner.sh index 690031af5..9c326747b 100755 --- a/scripts/build/peertube-runner.sh +++ b/scripts/build/peertube-runner.sh @@ -10,4 +10,4 @@ rm -rf ./dist rm -rf ./dist mkdir ./dist -./node_modules/.bin/esbuild ./peertube-runner.ts --bundle --platform=node --external:"./lib-cov/fluent-ffmpeg" --external:pg-hstore --outfile=dist/peertube-runner.js +./node_modules/.bin/esbuild ./peertube-runner.ts --bundle --platform=node --target=node14 --external:"./lib-cov/fluent-ffmpeg" --external:pg-hstore --outfile=dist/peertube-runner.js diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 21bf0f226..03f6fbea7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -96,6 +96,7 @@ export type CreateJobArgument = export type CreateJobOptions = { delay?: number priority?: number + failParentOnFailure?: boolean } const handlers: { [id in JobType]: (job: Job) => Promise } = { @@ -363,7 +364,11 @@ class JobQueue { name: 'job', data: job.payload, queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) + opts: { + failParentOnFailure: true, + + ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) + } } } diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index f3f8fc886..ef4ecb83e 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter { private streamingPlaylist: MStreamingPlaylistVideo private liveSegmentShaStore: LiveSegmentShaStore - private tsWatcher: FSWatcher - private masterWatcher: FSWatcher - private m3u8Watcher: FSWatcher + private filesWatcher: FSWatcher private masterPlaylistCreated = false private liveReady = false @@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter { await this.transcodingWrapper.run() + this.filesWatcher = watch(this.outDirectory, { depth: 0 }) + this.watchMasterFile() this.watchTSFiles() this.watchM3U8File() @@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter { } private watchMasterFile () { - this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) + this.filesWatcher.on('add', async path => { + if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return + if (this.masterPlaylistCreated === true) return - this.masterWatcher.on('add', async () => { try { if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) @@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter { this.masterPlaylistCreated = true logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) - - this.masterWatcher.close() - .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) }) } private watchM3U8File () { - this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') - const sendQueues = new Map() - const onChangeOrAdd = async (m3u8Path: string) => { + const onChange = async (m3u8Path: string) => { + if (m3u8Path.endsWith('.m3u8') !== true) return if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return + logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) + try { if (!sendQueues.has(m3u8Path)) { sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) @@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter { } } - this.m3u8Watcher.on('change', onChangeOrAdd) + this.filesWatcher.on('change', onChange) } private watchTSFiles () { const startStreamDateTime = new Date().getTime() - this.tsWatcher = watch(this.outDirectory + '/*.ts') - const playlistIdMatcher = /^([\d+])-/ const addHandler = async (segmentPath: string) => { - logger.debug('Live add handler of %s.', segmentPath, this.lTags()) + if (segmentPath.endsWith('.ts') !== true) return + + logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] @@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter { } const deleteHandler = async (segmentPath: string) => { + if (segmentPath.endsWith('.ts') !== true) return + + logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags()) + try { await this.liveSegmentShaStore.removeSegmentSha(segmentPath) } catch (err) { @@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter { } } - this.tsWatcher.on('add', p => addHandler(p)) - this.tsWatcher.on('unlink', p => deleteHandler(p)) + this.filesWatcher.on('add', p => addHandler(p)) + this.filesWatcher.on('unlink', p => deleteHandler(p)) } private async isQuotaExceeded (segmentPath: string) { @@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter { setTimeout(() => { // Wait latest segments generation, and close watchers - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) + const promise = this.filesWatcher?.close() || Promise.resolve() + promise .then(() => { // Process remaining segments hash for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts index 73fc14574..74b455107 100644 --- a/server/lib/runners/job-handlers/abstract-job-handler.ts +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts @@ -21,6 +21,7 @@ import { RunnerJobVODWebVideoTranscodingPayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models' +import { throttle } from 'lodash' type CreateRunnerJobArg = { @@ -48,6 +49,8 @@ export abstract class AbstractJobHandler @@ -102,16 +105,19 @@ export abstract class AbstractJobHandler { return sequelizeTypescript.transaction(async transaction => { - if (runnerJob.changed()) { - return runnerJob.save({ transaction }) - } - - // Don't update the job too often - if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { - await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) - } + return runnerJob.save({ transaction }) }) }) } diff --git a/server/tests/api/videos/resumable-upload.ts b/server/tests/api/videos/resumable-upload.ts index a70a7258b..2fbefb392 100644 --- a/server/tests/api/videos/resumable-upload.ts +++ b/server/tests/api/videos/resumable-upload.ts @@ -79,7 +79,7 @@ describe('Test resumable upload', function () { async function checkFileSize (uploadIdArg: string, expectedSize: number | null) { const uploadId = uploadIdArg.replace(/^upload_id=/, '') - const subPath = join('tmp', 'resumable-uploads', uploadId) + const subPath = join('tmp', 'resumable-uploads', `${rootId}-${uploadId}.mp4`) const filePath = server.servers.buildDirectory(subPath) const exists = await pathExists(filePath) diff --git a/server/tests/cli/create-import-video-file-job.ts b/server/tests/cli/create-import-video-file-job.ts index 43f53035b..3ece4f2ec 100644 --- a/server/tests/cli/create-import-video-file-job.ts +++ b/server/tests/cli/create-import-video-file-job.ts @@ -73,7 +73,7 @@ function runTests (objectStorage: boolean) { }) it('Should run a import job on video 1 with a lower resolution', async function () { - const command = `npm run create-import-video-file-job -- -v ${video1ShortId} -i server/tests/fixtures/video_short-480.webm` + const command = `npm run create-import-video-file-job -- -v ${video1ShortId} -i server/tests/fixtures/video_short_480.webm` await servers[0].cli.execWithEnv(command) await waitJobs(servers) diff --git a/shared/server-commands/requests/requests.ts b/shared/server-commands/requests/requests.ts index 96f67b4c7..e3f1817f1 100644 --- a/shared/server-commands/requests/requests.ts +++ b/shared/server-commands/requests/requests.ts @@ -159,7 +159,7 @@ function unwrapBodyOrDecodeToJSON (test: request.Test): Promise { try { return JSON.parse(new TextDecoder().decode(res.body)) } catch (err) { - console.error('Cannot decode JSON.', res.body instanceof Buffer ? res.body.toString() : res.body) + console.error('Cannot decode JSON.', { res, body: res.body instanceof Buffer ? res.body.toString() : res.body }) throw err } } @@ -168,7 +168,7 @@ function unwrapBodyOrDecodeToJSON (test: request.Test): Promise { try { return JSON.parse(res.text) } catch (err) { - console.error('Cannot decode json', res.text) + console.error('Cannot decode json', { res, text: res.text }) throw err } }