import { map } from 'bluebird' import { Job } from 'bull' import { isAnnounceActivityValid, isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/activity' import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' import { AP_CLEANER } from '@server/initializers/constants' import { checkUrlsSameHost } from '@server/lib/activitypub/url' import { Redis } from '@server/lib/redis' import { VideoModel } from '@server/models/video/video' import { VideoCommentModel } from '@server/models/video/video-comment' import { VideoShareModel } from '@server/models/video/video-share' import { HttpStatusCode } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' import { AccountVideoRateModel } from '../../../models/account/account-video-rate' const lTags = loggerTagsFactory('ap-cleaner') // Job to clean remote interactions off local videos async function processActivityPubCleaner (_job: Job) { logger.info('Processing ActivityPub cleaner.', lTags()) { const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = rateOptionsFactory() await map(rateUrls, async rateUrl => { const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) if (result?.status === 'deleted') { const { videoId, type } = result.data await VideoModel.syncLocalRates(videoId, type, undefined) } }, { concurrency: AP_CLEANER.CONCURRENCY }) } { const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = shareOptionsFactory() await map(shareUrls, async shareUrl => { await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) }, { concurrency: AP_CLEANER.CONCURRENCY }) } { const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = commentOptionsFactory() await map(commentUrls, async commentUrl => { await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) }, { concurrency: AP_CLEANER.CONCURRENCY }) } } // --------------------------------------------------------------------------- export { processActivityPubCleaner } // --------------------------------------------------------------------------- async function updateObjectIfNeeded (options: { url: string bodyValidator: (body: any) => boolean updater: (url: string, newUrl: string) => Promise deleter: (url: string) => Promise } ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { const { url, bodyValidator, updater, deleter } = options const on404OrTombstone = async () => { logger.info('Removing remote AP object %s.', url, lTags(url)) const data = await deleter(url) return { status: 'deleted' as 'deleted', data } } try { const { body } = await doJSONRequest(url, { activityPub: true }) // If not same id, check same host and update if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) if (body.type === 'Tombstone') { return on404OrTombstone() } const newUrl = body.id if (newUrl !== url) { if (checkUrlsSameHost(newUrl, url) !== true) { throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) } logger.info('Updating remote AP object %s.', url, lTags(url)) const data = await updater(url, newUrl) return { status: 'updated', data } } return null } catch (err) { // Does not exist anymore, remove entry if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) { return on404OrTombstone() } logger.debug('Remote AP object %s is unavailable.', url, lTags(url)) const unavailability = await Redis.Instance.addAPUnavailability(url) if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { logger.info('Removing unavailable AP resource %s.', url, lTags(url)) return on404OrTombstone() } return null } } function rateOptionsFactory () { return { bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), updater: async (url: string, newUrl: string) => { const rate = await AccountVideoRateModel.loadByUrl(url, undefined) rate.url = newUrl const videoId = rate.videoId const type = rate.type await rate.save() return { videoId, type } }, deleter: async (url) => { const rate = await AccountVideoRateModel.loadByUrl(url, undefined) const videoId = rate.videoId const type = rate.type await rate.destroy() return { videoId, type } } } } function shareOptionsFactory () { return { bodyValidator: (body: any) => isAnnounceActivityValid(body), updater: async (url: string, newUrl: string) => { const share = await VideoShareModel.loadByUrl(url, undefined) share.url = newUrl await share.save() return undefined }, deleter: async (url) => { const share = await VideoShareModel.loadByUrl(url, undefined) await share.destroy() return undefined } } } function commentOptionsFactory () { return { bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), updater: async (url: string, newUrl: string) => { const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) comment.url = newUrl await comment.save() return undefined }, deleter: async (url) => { const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) await comment.destroy() return undefined } } }