Use worker thread to send HTTP requests

Compute HTTP signature could be CPU intensive
This commit is contained in:
Chocobozzz 2022-09-08 12:26:46 +02:00
parent d800ec5f36
commit 405c83f9af
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
6 changed files with 110 additions and 29 deletions

View File

@ -192,6 +192,8 @@ async function findLatestRedirection (url: string, options: PeerTubeRequestOptio
// ---------------------------------------------------------------------------
export {
PeerTubeRequestOptions,
doRequest,
doJSONRequest,
doRequestAndSaveToFile,

View File

@ -786,6 +786,14 @@ const WORKER_THREADS = {
PROCESS_IMAGE: {
CONCURRENCY: 1,
MAX_THREADS: 5
},
SEQUENTIAL_HTTP_BROADCAST: {
CONCURRENCY: 1,
MAX_THREADS: 1
},
PARALLEL_HTTP_BROADCAST: {
CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
MAX_THREADS: 1
}
}

View File

@ -1,39 +1,28 @@
import { map } from 'bluebird'
import { Job } from 'bullmq'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
import { ActivitypubHttpBroadcastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
async function processActivityPubHttpBroadcast (job: Job) {
// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
logger.info('Processing ActivityPub broadcast in job %s.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload
const requestOptions = await buildRequestOptions(job.data)
const body = await computeBody(payload)
const httpSignatureOptions = await buildSignedRequestOptions(payload)
const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
const options = {
method: 'POST' as 'POST',
json: body,
httpSignature: httpSignatureOptions,
headers: buildGlobalHeaders(body)
}
return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
}
const badUrls: string[] = []
const goodUrls: string[] = []
async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
await map(payload.uris, async uri => {
try {
await doRequest(uri, options)
goodUrls.push(uri)
} catch (err) {
logger.debug('HTTP broadcast to %s failed.', uri, { err })
badUrls.push(uri)
}
}, { concurrency: BROADCAST_CONCURRENCY })
const requestOptions = await buildRequestOptions(job.data)
const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
}
@ -41,5 +30,20 @@ async function processActivityPubHttpBroadcast (job: Job) {
// ---------------------------------------------------------------------------
export {
processActivityPubHttpBroadcast
processActivityPubHttpSequentialBroadcast,
processActivityPubParallelHttpBroadcast
}
// ---------------------------------------------------------------------------
async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
const body = await computeBody(payload)
const httpSignatureOptions = await buildSignedRequestOptions(payload)
return {
method: 'POST' as 'POST',
json: body,
httpSignature: httpSignatureOptions,
headers: buildGlobalHeaders(body)
}
}

View File

@ -45,7 +45,7 @@ import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_
import { Hooks } from '../plugins/hooks'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { refreshAPObject } from './handlers/activitypub-refresher'
@ -96,8 +96,8 @@ export type CreateJobOptions = {
}
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,

View File

@ -2,6 +2,7 @@ import { join } from 'path'
import Piscina from 'piscina'
import { processImage } from '@server/helpers/image-utils'
import { WORKER_THREADS } from '@server/initializers/constants'
import { httpBroadcast } from './workers/http-broadcast'
import { downloadImage } from './workers/image-downloader'
let downloadImageWorker: Piscina
@ -34,7 +35,41 @@ function processImageFromWorker (options: Parameters<typeof processImage>[0]): P
return processImageWorker.run(options)
}
// ---------------------------------------------------------------------------
let parallelHTTPBroadcastWorker: Piscina
function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
if (!parallelHTTPBroadcastWorker) {
parallelHTTPBroadcastWorker = new Piscina({
filename: join(__dirname, 'workers', 'http-broadcast.js'),
concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY,
maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS
})
}
return parallelHTTPBroadcastWorker.run(options)
}
// ---------------------------------------------------------------------------
let sequentialHTTPBroadcastWorker: Piscina
function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
if (!sequentialHTTPBroadcastWorker) {
sequentialHTTPBroadcastWorker = new Piscina({
filename: join(__dirname, 'workers', 'http-broadcast.js'),
concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY,
maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS
})
}
return sequentialHTTPBroadcastWorker.run(options)
}
export {
downloadImageFromWorker,
processImageFromWorker
processImageFromWorker,
parallelHTTPBroadcastFromWorker,
sequentialHTTPBroadcastFromWorker
}

View File

@ -0,0 +1,32 @@
import { map } from 'bluebird'
import { logger } from '@server/helpers/logger'
import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests'
import { BROADCAST_CONCURRENCY } from '@server/initializers/constants'
async function httpBroadcast (payload: {
uris: string[]
requestOptions: PeerTubeRequestOptions
}) {
const { uris, requestOptions } = payload
const badUrls: string[] = []
const goodUrls: string[] = []
await map(uris, async uri => {
try {
await doRequest(uri, requestOptions)
goodUrls.push(uri)
} catch (err) {
logger.debug('HTTP broadcast to %s failed.', uri, { err })
badUrls.push(uri)
}
}, { concurrency: BROADCAST_CONCURRENCY })
return { goodUrls, badUrls }
}
module.exports = httpBroadcast
export {
httpBroadcast
}