From 405c83f9af377a663a4c8e9ad025fd5c10496922 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 8 Sep 2022 12:26:46 +0200 Subject: [PATCH] Use worker thread to send HTTP requests Compute HTTP signature could be CPU intensive --- server/helpers/requests.ts | 2 + server/initializers/constants.ts | 8 +++ .../handlers/activitypub-http-broadcast.ts | 54 ++++++++++--------- server/lib/job-queue/job-queue.ts | 6 +-- server/lib/worker/parent-process.ts | 37 ++++++++++++- server/lib/worker/workers/http-broadcast.ts | 32 +++++++++++ 6 files changed, 110 insertions(+), 29 deletions(-) create mode 100644 server/lib/worker/workers/http-broadcast.ts diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts index 0756beb16..1a3cc1b5b 100644 --- a/server/helpers/requests.ts +++ b/server/helpers/requests.ts @@ -192,6 +192,8 @@ async function findLatestRedirection (url: string, options: PeerTubeRequestOptio // --------------------------------------------------------------------------- export { + PeerTubeRequestOptions, + doRequest, doJSONRequest, doRequestAndSaveToFile, diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index c2289ef36..f7d9a41da 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -786,6 +786,14 @@ const WORKER_THREADS = { PROCESS_IMAGE: { CONCURRENCY: 1, MAX_THREADS: 5 + }, + SEQUENTIAL_HTTP_BROADCAST: { + CONCURRENCY: 1, + MAX_THREADS: 1 + }, + PARALLEL_HTTP_BROADCAST: { + CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'], + MAX_THREADS: 1 } } diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 13eff5211..733c1378a 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,39 +1,28 @@ -import { map } from 'bluebird' import { Job } from 'bullmq' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' +import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' import { ActivitypubHttpBroadcastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' -async function processActivityPubHttpBroadcast (job: Job) { +// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive + +async function processActivityPubHttpSequentialBroadcast (job: Job) { logger.info('Processing ActivityPub broadcast in job %s.', job.id) - const payload = job.data as ActivitypubHttpBroadcastPayload + const requestOptions = await buildRequestOptions(job.data) - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) + const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) - const options = { - method: 'POST' as 'POST', - json: body, - httpSignature: httpSignatureOptions, - headers: buildGlobalHeaders(body) - } + return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) +} - const badUrls: string[] = [] - const goodUrls: string[] = [] +async function processActivityPubParallelHttpBroadcast (job: Job) { + logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id) - await map(payload.uris, async uri => { - try { - await doRequest(uri, options) - goodUrls.push(uri) - } catch (err) { - logger.debug('HTTP broadcast to %s failed.', uri, { err }) - badUrls.push(uri) - } - }, { concurrency: BROADCAST_CONCURRENCY }) + const requestOptions = await buildRequestOptions(job.data) + + const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) } @@ -41,5 +30,20 @@ async function processActivityPubHttpBroadcast (job: Job) { // --------------------------------------------------------------------------- export { - processActivityPubHttpBroadcast + processActivityPubHttpSequentialBroadcast, + processActivityPubParallelHttpBroadcast +} + +// --------------------------------------------------------------------------- + +async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) { + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + return { + method: 'POST' as 'POST', + json: body, + httpSignature: httpSignatureOptions, + headers: buildGlobalHeaders(body) + } } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0fcaaf466..e54d12acd 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -45,7 +45,7 @@ import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_ import { Hooks } from '../plugins/hooks' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' -import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { refreshAPObject } from './handlers/activitypub-refresher' @@ -96,8 +96,8 @@ export type CreateJobOptions = { } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts index 4bc7f2620..7d4102047 100644 --- a/server/lib/worker/parent-process.ts +++ b/server/lib/worker/parent-process.ts @@ -2,6 +2,7 @@ import { join } from 'path' import Piscina from 'piscina' import { processImage } from '@server/helpers/image-utils' import { WORKER_THREADS } from '@server/initializers/constants' +import { httpBroadcast } from './workers/http-broadcast' import { downloadImage } from './workers/image-downloader' let downloadImageWorker: Piscina @@ -34,7 +35,41 @@ function processImageFromWorker (options: Parameters[0]): P return processImageWorker.run(options) } +// --------------------------------------------------------------------------- + +let parallelHTTPBroadcastWorker: Piscina + +function parallelHTTPBroadcastFromWorker (options: Parameters[0]): Promise> { + if (!parallelHTTPBroadcastWorker) { + parallelHTTPBroadcastWorker = new Piscina({ + filename: join(__dirname, 'workers', 'http-broadcast.js'), + concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY, + maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS + }) + } + + return parallelHTTPBroadcastWorker.run(options) +} + +// --------------------------------------------------------------------------- + +let sequentialHTTPBroadcastWorker: Piscina + +function sequentialHTTPBroadcastFromWorker (options: Parameters[0]): Promise> { + if (!sequentialHTTPBroadcastWorker) { + sequentialHTTPBroadcastWorker = new Piscina({ + filename: join(__dirname, 'workers', 'http-broadcast.js'), + concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY, + maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS + }) + } + + return sequentialHTTPBroadcastWorker.run(options) +} + export { downloadImageFromWorker, - processImageFromWorker + processImageFromWorker, + parallelHTTPBroadcastFromWorker, + sequentialHTTPBroadcastFromWorker } diff --git a/server/lib/worker/workers/http-broadcast.ts b/server/lib/worker/workers/http-broadcast.ts new file mode 100644 index 000000000..8c9c6b8ca --- /dev/null +++ b/server/lib/worker/workers/http-broadcast.ts @@ -0,0 +1,32 @@ +import { map } from 'bluebird' +import { logger } from '@server/helpers/logger' +import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests' +import { BROADCAST_CONCURRENCY } from '@server/initializers/constants' + +async function httpBroadcast (payload: { + uris: string[] + requestOptions: PeerTubeRequestOptions +}) { + const { uris, requestOptions } = payload + + const badUrls: string[] = [] + const goodUrls: string[] = [] + + await map(uris, async uri => { + try { + await doRequest(uri, requestOptions) + goodUrls.push(uri) + } catch (err) { + logger.debug('HTTP broadcast to %s failed.', uri, { err }) + badUrls.push(uri) + } + }, { concurrency: BROADCAST_CONCURRENCY }) + + return { goodUrls, badUrls } +} + +module.exports = httpBroadcast + +export { + httpBroadcast +}