From d61b817890d5d5bba61d447518321870498028d8 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 14 Sep 2018 16:47:15 +0200 Subject: [PATCH] Process inbox activities in a queue --- server/controllers/activitypub/inbox.ts | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/server/controllers/activitypub/inbox.ts b/server/controllers/activitypub/inbox.ts index 20bd20ed4..738d155eb 100644 --- a/server/controllers/activitypub/inbox.ts +++ b/server/controllers/activitypub/inbox.ts @@ -7,6 +7,8 @@ import { asyncMiddleware, checkSignature, localAccountValidator, localVideoChann import { activityPubValidator } from '../../middlewares/validators/activitypub/activity' import { VideoChannelModel } from '../../models/video/video-channel' import { AccountModel } from '../../models/account/account' +import { queue } from 'async' +import { ActorModel } from '../../models/activitypub/actor' const inboxRouter = express.Router() @@ -14,7 +16,7 @@ inboxRouter.post('/inbox', signatureValidator, asyncMiddleware(checkSignature), asyncMiddleware(activityPubValidator), - asyncMiddleware(inboxController) + inboxController ) inboxRouter.post('/accounts/:name/inbox', @@ -22,14 +24,14 @@ inboxRouter.post('/accounts/:name/inbox', asyncMiddleware(checkSignature), asyncMiddleware(localAccountValidator), asyncMiddleware(activityPubValidator), - asyncMiddleware(inboxController) + inboxController ) inboxRouter.post('/video-channels/:name/inbox', signatureValidator, asyncMiddleware(checkSignature), asyncMiddleware(localVideoChannelValidator), asyncMiddleware(activityPubValidator), - asyncMiddleware(inboxController) + inboxController ) // --------------------------------------------------------------------------- @@ -40,7 +42,12 @@ export { // --------------------------------------------------------------------------- -async function inboxController (req: express.Request, res: express.Response, next: express.NextFunction) { +const inboxQueue = queue<{ activities: Activity[], signatureActor?: ActorModel, inboxActor?: ActorModel }, Error>((task, cb) => { + processActivities(task.activities, task.signatureActor, task.inboxActor) + .then(() => cb()) +}) + +function inboxController (req: express.Request, res: express.Response, next: express.NextFunction) { const rootActivity: RootActivity = req.body let activities: Activity[] = [] @@ -66,7 +73,11 @@ async function inboxController (req: express.Request, res: express.Response, nex logger.info('Receiving inbox requests for %d activities by %s.', activities.length, res.locals.signature.actor.url) - await processActivities(activities, res.locals.signature.actor, accountOrChannel ? accountOrChannel.Actor : undefined) + inboxQueue.push({ + activities, + signatureActor: res.locals.signature.actor, + inboxActor: accountOrChannel ? accountOrChannel.Actor : undefined + }) - res.status(204).end() + return res.status(204).end() }