Process unicast job in a worker too

Signing the request can take a long time
This commit is contained in:
Chocobozzz 2023-10-25 09:43:35 +02:00
parent 90db2b3aed
commit cb38deb288
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
3 changed files with 32 additions and 3 deletions

View File

@ -2,8 +2,8 @@ import { Job } from 'bullmq'
import { ActivitypubHttpUnicastPayload } from '@peertube/peertube-models' import { ActivitypubHttpUnicastPayload } from '@peertube/peertube-models'
import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js' import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js'
import { logger } from '../../../helpers/logger.js' import { logger } from '../../../helpers/logger.js'
import { doRequest } from '../../../helpers/requests.js'
import { ActorFollowHealthCache } from '../../actor-follow-health-cache.js' import { ActorFollowHealthCache } from '../../actor-follow-health-cache.js'
import { httpUnicastFromWorker } from '@server/lib/worker/parent-process.js'
async function processActivityPubHttpUnicast (job: Job) { async function processActivityPubHttpUnicast (job: Job) {
logger.info('Processing ActivityPub unicast in job %s.', job.id) logger.info('Processing ActivityPub unicast in job %s.', job.id)
@ -22,7 +22,7 @@ async function processActivityPubHttpUnicast (job: Job) {
} }
try { try {
await doRequest(uri, options) await httpUnicastFromWorker({ uri, requestOptions: options })
ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
} catch (err) { } catch (err) {
ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])

View File

@ -7,6 +7,7 @@ import type processImage from './workers/image-processor.js'
import type getImageSize from './workers/get-image-size.js' import type getImageSize from './workers/get-image-size.js'
import type signJsonLDObject from './workers/sign-json-ld-object.js' import type signJsonLDObject from './workers/sign-json-ld-object.js'
import type buildDigest from './workers/build-digest.js' import type buildDigest from './workers/build-digest.js'
import type httpUnicast from './workers/http-unicast.js'
let downloadImageWorker: Piscina let downloadImageWorker: Piscina
@ -92,6 +93,25 @@ export function sequentialHTTPBroadcastFromWorker (
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
let httpUnicastWorker: Piscina
export function httpUnicastFromWorker (
options: Parameters<typeof httpUnicast>[0]
): Promise<ReturnType<typeof httpUnicast>> {
if (!httpUnicastWorker) {
httpUnicastWorker = new Piscina({
filename: new URL(join('workers', 'http-unicast.js'), import.meta.url).href,
// Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-unicast'],
maxThreads: 1
})
}
return httpUnicastWorker.run(options)
}
// ---------------------------------------------------------------------------
let signJsonLDObjectWorker: Piscina let signJsonLDObjectWorker: Piscina
export function signJsonLDObjectFromWorker <T> ( export function signJsonLDObjectFromWorker <T> (
@ -100,7 +120,6 @@ export function signJsonLDObjectFromWorker <T> (
if (!signJsonLDObjectWorker) { if (!signJsonLDObjectWorker) {
signJsonLDObjectWorker = new Piscina({ signJsonLDObjectWorker = new Piscina({
filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href, filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href,
// Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY, concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY,
maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS
}) })

View File

@ -0,0 +1,10 @@
import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests.js'
async function httpUnicast (payload: {
uri: string
requestOptions: PeerTubeRequestOptions
}) {
await doRequest(payload.uri, payload.requestOptions)
}
export default httpUnicast