PeerTube/server/lib/activitypub/inbox-manager.ts

61 lines
1.7 KiB
TypeScript
Raw Normal View History

2020-12-15 06:34:58 -06:00
import { AsyncQueue, queue } from 'async'
import { logger } from '@server/helpers/logger'
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
import { MActorDefault, MActorSignature } from '@server/types/models'
import { Activity } from '@shared/models'
import { processActivities } from './process'
import { StatsManager } from '../stat-manager'
type QueueParam = {
activities: Activity[]
signatureActor?: MActorSignature
inboxActor?: MActorDefault
}
class InboxManager {
private static instance: InboxManager
private readonly inboxQueue: AsyncQueue<QueueParam>
private messagesProcessed = 0
private constructor () {
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
this.messagesProcessed++
processActivities(task.activities, options)
.then(() => cb())
.catch(err => {
logger.error('Error in process activities.', { err })
cb()
})
})
setInterval(() => {
2021-02-18 07:44:12 -06:00
StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
2020-12-15 06:34:58 -06:00
}, SCHEDULER_INTERVALS_MS.updateInboxStats)
}
addInboxMessage (options: QueueParam) {
this.inboxQueue.push(options)
2021-02-03 02:33:05 -06:00
.catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
2020-12-15 06:34:58 -06:00
}
2021-02-18 07:44:12 -06:00
getActivityPubMessagesWaiting () {
return this.inboxQueue.length() + this.inboxQueue.running()
}
2020-12-15 06:34:58 -06:00
static get Instance () {
return this.instance || (this.instance = new this())
}
}
// ---------------------------------------------------------------------------
export {
InboxManager
}