diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts index 9f5c04406..43578eedd 100644 --- a/client/src/app/+admin/system/jobs/jobs.component.ts +++ b/client/src/app/+admin/system/jobs/jobs.component.ts @@ -28,6 +28,7 @@ export class JobsComponent extends RestTable implements OnInit { 'activitypub-http-fetcher', 'activitypub-http-unicast', 'activitypub-refresher', + 'activitypub-cleaner', 'actor-keys', 'email', 'video-file-import', diff --git a/config/default.yaml b/config/default.yaml index 2d8afe1c3..a09d20b9d 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -192,6 +192,12 @@ federation: videos: federate_unlisted: false + # Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments) + # It removes objects that do not exist anymore, and potentially fix their URLs + # This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted + # We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes + cleanup_remote_interactions: false + cache: previews: size: 500 # Max number of previews you want to cache diff --git a/config/production.yaml.example b/config/production.yaml.example index 2794c543c..31c0e6b96 100644 --- a/config/production.yaml.example +++ b/config/production.yaml.example @@ -190,6 +190,12 @@ federation: videos: federate_unlisted: false + # Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments) + # It removes objects that do not exist anymore, and potentially fix their URLs + # This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted + # We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes + cleanup_remote_interactions: false + ############################################################################### # diff --git a/server/helpers/custom-validators/activitypub/activity.ts b/server/helpers/custom-validators/activitypub/activity.ts index 8b8c0685f..da79b2782 100644 --- a/server/helpers/custom-validators/activitypub/activity.ts +++ b/server/helpers/custom-validators/activitypub/activity.ts @@ -1,15 +1,16 @@ import validator from 'validator' import { Activity, ActivityType } from '../../../../shared/models/activitypub' +import { exists } from '../misc' import { sanitizeAndCheckActorObject } from './actor' +import { isCacheFileObjectValid } from './cache-file' +import { isFlagActivityValid } from './flag' import { isActivityPubUrlValid, isBaseActivityValid, isObjectValid } from './misc' -import { isDislikeActivityValid } from './rate' +import { isPlaylistObjectValid } from './playlist' +import { isDislikeActivityValid, isLikeActivityValid } from './rate' +import { isShareActivityValid } from './share' import { sanitizeAndCheckVideoCommentObject } from './video-comments' import { sanitizeAndCheckVideoTorrentObject } from './videos' import { isViewActivityValid } from './view' -import { exists } from '../misc' -import { isCacheFileObjectValid } from './cache-file' -import { isFlagActivityValid } from './flag' -import { isPlaylistObjectValid } from './playlist' function isRootActivityValid (activity: any) { return isCollection(activity) || isActivity(activity) @@ -70,8 +71,11 @@ function checkFlagActivity (activity: any) { } function checkDislikeActivity (activity: any) { - return isBaseActivityValid(activity, 'Dislike') && - isDislikeActivityValid(activity) + return isDislikeActivityValid(activity) +} + +function checkLikeActivity (activity: any) { + return isLikeActivityValid(activity) } function checkCreateActivity (activity: any) { @@ -118,8 +122,7 @@ function checkRejectActivity (activity: any) { } function checkAnnounceActivity (activity: any) { - return isBaseActivityValid(activity, 'Announce') && - isObjectValid(activity.object) + return isShareActivityValid(activity) } function checkUndoActivity (activity: any) { @@ -132,8 +135,3 @@ function checkUndoActivity (activity: any) { checkCreateActivity(activity.object) ) } - -function checkLikeActivity (activity: any) { - return isBaseActivityValid(activity, 'Like') && - isObjectValid(activity.object) -} diff --git a/server/helpers/custom-validators/activitypub/rate.ts b/server/helpers/custom-validators/activitypub/rate.ts index ba68e8074..aafdda443 100644 --- a/server/helpers/custom-validators/activitypub/rate.ts +++ b/server/helpers/custom-validators/activitypub/rate.ts @@ -1,13 +1,18 @@ -import { isActivityPubUrlValid, isObjectValid } from './misc' +import { isBaseActivityValid, isObjectValid } from './misc' + +function isLikeActivityValid (activity: any) { + return isBaseActivityValid(activity, 'Like') && + isObjectValid(activity.object) +} function isDislikeActivityValid (activity: any) { - return activity.type === 'Dislike' && - isActivityPubUrlValid(activity.actor) && + return isBaseActivityValid(activity, 'Dislike') && isObjectValid(activity.object) } // --------------------------------------------------------------------------- export { - isDislikeActivityValid + isDislikeActivityValid, + isLikeActivityValid } diff --git a/server/helpers/custom-validators/activitypub/share.ts b/server/helpers/custom-validators/activitypub/share.ts new file mode 100644 index 000000000..fb5e4c05e --- /dev/null +++ b/server/helpers/custom-validators/activitypub/share.ts @@ -0,0 +1,11 @@ +import { isBaseActivityValid, isObjectValid } from './misc' + +function isShareActivityValid (activity: any) { + return isBaseActivityValid(activity, 'Announce') && + isObjectValid(activity.object) +} +// --------------------------------------------------------------------------- + +export { + isShareActivityValid +} diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts index 2578de5ed..565e0d1fa 100644 --- a/server/initializers/checker-before-init.ts +++ b/server/initializers/checker-before-init.ts @@ -36,7 +36,7 @@ function checkMissedConfig () { 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', 'theme.default', 'remote_redundancy.videos.accept_from', - 'federation.videos.federate_unlisted', + 'federation.videos.federate_unlisted', 'federation.videos.cleanup_remote_interactions', 'search.remote_uri.users', 'search.remote_uri.anonymous', 'search.search_index.enabled', 'search.search_index.url', 'search.search_index.disable_local_search', 'search.search_index.is_default_search', 'live.enabled', 'live.allow_replay', 'live.max_duration', 'live.max_user_lives', 'live.max_instance_lives', diff --git a/server/initializers/config.ts b/server/initializers/config.ts index 21ca78584..c16b63c33 100644 --- a/server/initializers/config.ts +++ b/server/initializers/config.ts @@ -159,7 +159,8 @@ const CONFIG = { }, FEDERATION: { VIDEOS: { - FEDERATE_UNLISTED: config.get('federation.videos.federate_unlisted') + FEDERATE_UNLISTED: config.get('federation.videos.federate_unlisted'), + CLEANUP_REMOTE_INTERACTIONS: config.get('federation.videos.cleanup_remote_interactions') } }, ADMIN: { diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 74192d590..083a29889 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -137,6 +137,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { 'activitypub-http-unicast': 5, 'activitypub-http-fetcher': 5, 'activitypub-follow': 5, + 'activitypub-cleaner': 1, 'video-file-import': 1, 'video-transcoding': 1, 'video-import': 1, @@ -147,10 +148,12 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { 'video-redundancy': 1, 'video-live-ending': 1 } -const JOB_CONCURRENCY: { [id in JobType]?: number } = { +// Excluded keys are jobs that can be configured by admins +const JOB_CONCURRENCY: { [id in Exclude]: number } = { 'activitypub-http-broadcast': 1, 'activitypub-http-unicast': 5, 'activitypub-http-fetcher': 1, + 'activitypub-cleaner': 1, 'activitypub-follow': 1, 'video-file-import': 1, 'email': 5, @@ -165,6 +168,7 @@ const JOB_TTL: { [id in JobType]: number } = { 'activitypub-http-unicast': 60000 * 10, // 10 minutes 'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours 'activitypub-follow': 60000 * 10, // 10 minutes + 'activitypub-cleaner': 1000 * 3600, // 1 hour 'video-file-import': 1000 * 3600, // 1 hour 'video-transcoding': 1000 * 3600 * 48, // 2 days, transcoding could be long 'video-import': 1000 * 3600 * 2, // 2 hours @@ -178,6 +182,9 @@ const JOB_TTL: { [id in JobType]: number } = { const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 'videos-views': { cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour + }, + 'activitypub-cleaner': { + cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM } } const JOB_PRIORITY = { @@ -188,6 +195,7 @@ const JOB_PRIORITY = { } const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job +const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) const JOB_REQUEST_TIMEOUT = 7000 // 7 seconds const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days @@ -756,6 +764,7 @@ if (isTestInstance() === true) { SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000 SCHEDULER_INTERVALS_MS.updateInboxStats = 5000 REPEAT_JOBS['videos-views'] = { every: 5000 } + REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 @@ -815,6 +824,7 @@ export { REDUNDANCY, JOB_CONCURRENCY, JOB_ATTEMPTS, + AP_CLEANER_CONCURRENCY, LAST_MIGRATION_VERSION, OAUTH_LIFETIME, CUSTOM_HTML_TAG_COMMENTS, diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts index 902d877c4..d025ed7f1 100644 --- a/server/lib/activitypub/video-comments.ts +++ b/server/lib/activitypub/video-comments.ts @@ -41,10 +41,10 @@ async function resolveThread (params: ResolveThreadParams): ResolveThreadResult return await tryResolveThreadFromVideo(params) } } catch (err) { - logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err }) + logger.debug('Cannot resolve thread from video %s, maybe because it was not a video', url, { err }) } - return resolveParentComment(params) + return resolveRemoteParentComment(params) } export { @@ -119,7 +119,7 @@ async function tryResolveThreadFromVideo (params: ResolveThreadParams) { return { video, comment: resultComment, commentCreated } } -async function resolveParentComment (params: ResolveThreadParams) { +async function resolveRemoteParentComment (params: ResolveThreadParams) { const { url, comments } = params if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) { @@ -133,7 +133,7 @@ async function resolveParentComment (params: ResolveThreadParams) { }) if (sanitizeAndCheckVideoCommentObject(body) === false) { - throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body)) + throw new Error(`Remote video comment JSON ${url} is not valid:` + JSON.stringify(body)) } const actorUrl = body.attributedTo diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts new file mode 100644 index 000000000..b58bbc983 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -0,0 +1,194 @@ +import * as Bluebird from 'bluebird' +import * as Bull from 'bull' +import { checkUrlsSameHost } from '@server/helpers/activitypub' +import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' +import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' +import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' +import { doRequest } from '@server/helpers/requests' +import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' +import { VideoCommentModel } from '@server/models/video/video-comment' +import { VideoShareModel } from '@server/models/video/video-share' +import { HttpStatusCode } from '@shared/core-utils' +import { logger } from '../../../helpers/logger' +import { AccountVideoRateModel } from '../../../models/account/account-video-rate' + +// Job to clean remote interactions off local videos + +async function processActivityPubCleaner (_job: Bull.Job) { + logger.info('Processing ActivityPub cleaner.') + + { + const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() + const { bodyValidator, deleter, updater } = rateOptionsFactory() + + await Bluebird.map(rateUrls, async rateUrl => { + try { + const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) + + if (result?.status === 'deleted') { + const { videoId, type } = result.data + + await VideoModel.updateRatesOf(videoId, type, undefined) + } + } catch (err) { + logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) + } + }, { concurrency: AP_CLEANER_CONCURRENCY }) + } + + { + const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() + const { bodyValidator, deleter, updater } = shareOptionsFactory() + + await Bluebird.map(shareUrls, async shareUrl => { + try { + await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) + } catch (err) { + logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) + } + }, { concurrency: AP_CLEANER_CONCURRENCY }) + } + + { + const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() + const { bodyValidator, deleter, updater } = commentOptionsFactory() + + await Bluebird.map(commentUrls, async commentUrl => { + try { + await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) + } catch (err) { + logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) + } + }, { concurrency: AP_CLEANER_CONCURRENCY }) + } +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubCleaner +} + +// --------------------------------------------------------------------------- + +async function updateObjectIfNeeded ( + url: string, + bodyValidator: (body: any) => boolean, + updater: (url: string, newUrl: string) => Promise, + deleter: (url: string) => Promise +): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { + // Fetch url + const { response, body } = await doRequest({ + uri: url, + json: true, + activityPub: true + }) + + // Does not exist anymore, remove entry + if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { + logger.info('Removing remote AP object %s.', url) + const data = await deleter(url) + + return { status: 'deleted', data } + } + + // If not same id, check same host and update + if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) + + if (body.type === 'Tombstone') { + logger.info('Removing remote AP object %s.', url) + const data = await deleter(url) + + return { status: 'deleted', data } + } + + const newUrl = body.id + if (newUrl !== url) { + if (checkUrlsSameHost(newUrl, url) !== true) { + throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) + } + + logger.info('Updating remote AP object %s.', url) + const data = await updater(url, newUrl) + + return { status: 'updated', data } + } + + return null +} + +function rateOptionsFactory () { + return { + bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), + + updater: async (url: string, newUrl: string) => { + const rate = await AccountVideoRateModel.loadByUrl(url, undefined) + rate.url = newUrl + + const videoId = rate.videoId + const type = rate.type + + await rate.save() + + return { videoId, type } + }, + + deleter: async (url) => { + const rate = await AccountVideoRateModel.loadByUrl(url, undefined) + + const videoId = rate.videoId + const type = rate.type + + await rate.destroy() + + return { videoId, type } + } + } +} + +function shareOptionsFactory () { + return { + bodyValidator: (body: any) => isShareActivityValid(body), + + updater: async (url: string, newUrl: string) => { + const share = await VideoShareModel.loadByUrl(url, undefined) + share.url = newUrl + + await share.save() + + return undefined + }, + + deleter: async (url) => { + const share = await VideoShareModel.loadByUrl(url, undefined) + + await share.destroy() + + return undefined + } + } +} + +function commentOptionsFactory () { + return { + bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), + + updater: async (url: string, newUrl: string) => { + const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) + comment.url = newUrl + + await comment.save() + + return undefined + }, + + deleter: async (url) => { + const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) + + await comment.destroy() + + return undefined + } + } +} diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 8da549640..125307843 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts @@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' async function processActorKeys (job: Bull.Job) { const payload = job.data as ActorKeysPayload - logger.info('Processing email in job %d.', job.id) + logger.info('Processing actor keys in job %d.', job.id) const actor = await ActorModel.load(payload.actorId) diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index efda2e038..42e8347b1 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -21,6 +21,7 @@ import { import { logger } from '../../helpers/logger' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { Redis } from '../redis' +import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' @@ -38,6 +39,7 @@ type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | + { type: 'activitypub-http-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | @@ -58,6 +60,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, 'video-transcoding': processVideoTranscoding, @@ -75,6 +78,7 @@ const jobTypes: JobType[] = [ 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', + 'activitypub-cleaner', 'email', 'video-transcoding', 'video-file-import', @@ -233,6 +237,12 @@ class JobQueue { this.queues['videos-views'].add({}, { repeat: REPEAT_JOBS['videos-views'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) + + if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { + this.queues['activitypub-cleaner'].add({}, { + repeat: REPEAT_JOBS['activitypub-cleaner'] + }).catch(err => logger.error('Cannot add repeatable job.', { err })) + } } private filterJobTypes (jobType?: JobType) { diff --git a/server/middlewares/validators/videos/video-rates.ts b/server/middlewares/validators/videos/video-rates.ts index 7dcba15f1..01bdef25f 100644 --- a/server/middlewares/validators/videos/video-rates.ts +++ b/server/middlewares/validators/videos/video-rates.ts @@ -1,6 +1,6 @@ import * as express from 'express' import { body, param, query } from 'express-validator' -import { isIdOrUUIDValid } from '../../../helpers/custom-validators/misc' +import { isIdOrUUIDValid, isIdValid } from '../../../helpers/custom-validators/misc' import { isRatingValid } from '../../../helpers/custom-validators/video-rates' import { isVideoRatingTypeValid } from '../../../helpers/custom-validators/videos' import { logger } from '../../../helpers/logger' @@ -28,14 +28,14 @@ const videoUpdateRateValidator = [ const getAccountVideoRateValidatorFactory = function (rateType: VideoRateType) { return [ param('name').custom(isAccountNameValid).withMessage('Should have a valid account name'), - param('videoId').custom(isIdOrUUIDValid).not().isEmpty().withMessage('Should have a valid videoId'), + param('videoId').custom(isIdValid).not().isEmpty().withMessage('Should have a valid videoId'), async (req: express.Request, res: express.Response, next: express.NextFunction) => { logger.debug('Checking videoCommentGetValidator parameters.', { parameters: req.params }) if (areValidationErrors(req, res)) return - const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, req.params.videoId) + const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, +req.params.videoId) if (!rate) { return res.status(HttpStatusCode.NOT_FOUND_404) .json({ error: 'Video rate not found' }) diff --git a/server/models/account/account-video-rate.ts b/server/models/account/account-video-rate.ts index d9c529491..801f76bba 100644 --- a/server/models/account/account-video-rate.ts +++ b/server/models/account/account-video-rate.ts @@ -146,10 +146,22 @@ export class AccountVideoRateModel extends Model { return AccountVideoRateModel.findAndCountAll(query) } + static listRemoteRateUrlsOfLocalVideos () { + const query = `SELECT "accountVideoRate".url FROM "accountVideoRate" ` + + `INNER JOIN account ON account.id = "accountVideoRate"."accountId" ` + + `INNER JOIN actor ON actor.id = account."actorId" AND actor."serverId" IS NOT NULL ` + + `INNER JOIN video ON video.id = "accountVideoRate"."videoId" AND video.remote IS FALSE` + + return AccountVideoRateModel.sequelize.query<{ url: string }>(query, { + type: QueryTypes.SELECT, + raw: true + }).then(rows => rows.map(r => r.url)) + } + static loadLocalAndPopulateVideo ( rateType: VideoRateType, accountName: string, - videoId: number | string, + videoId: number, t?: Transaction ): Promise { const options: FindOptions = { @@ -241,21 +253,7 @@ export class AccountVideoRateModel extends Model { await AccountVideoRateModel.destroy(query) - const field = type === 'like' - ? 'likes' - : 'dislikes' - - const rawQuery = `UPDATE "video" SET "${field}" = ` + - '(' + - 'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' + - ') ' + - 'WHERE "video"."id" = :videoId' - - return AccountVideoRateModel.sequelize.query(rawQuery, { - transaction: t, - replacements: { videoId, rateType: type }, - type: QueryTypes.UPDATE - }) + return VideoModel.updateRatesOf(videoId, type, t) }) } diff --git a/server/models/video/video-comment.ts b/server/models/video/video-comment.ts index dc7556d44..151c2bc81 100644 --- a/server/models/video/video-comment.ts +++ b/server/models/video/video-comment.ts @@ -1,5 +1,5 @@ import { uniq } from 'lodash' -import { FindAndCountOptions, FindOptions, Op, Order, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize' +import { FindAndCountOptions, FindOptions, Op, Order, QueryTypes, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize' import { AllowNull, BelongsTo, @@ -696,6 +696,18 @@ export class VideoCommentModel extends Model { } } + static listRemoteCommentUrlsOfLocalVideos () { + const query = `SELECT "videoComment".url FROM "videoComment" ` + + `INNER JOIN account ON account.id = "videoComment"."accountId" ` + + `INNER JOIN actor ON actor.id = "account"."actorId" AND actor."serverId" IS NOT NULL ` + + `INNER JOIN video ON video.id = "videoComment"."videoId" AND video.remote IS FALSE` + + return VideoCommentModel.sequelize.query<{ url: string }>(query, { + type: QueryTypes.SELECT, + raw: true + }).then(rows => rows.map(r => r.url)) + } + static cleanOldCommentsOf (videoId: number, beforeUpdatedAt: Date) { const query = { where: { diff --git a/server/models/video/video-share.ts b/server/models/video/video-share.ts index b7f5f3fa3..5059c1fa6 100644 --- a/server/models/video/video-share.ts +++ b/server/models/video/video-share.ts @@ -1,4 +1,4 @@ -import { literal, Op, Transaction } from 'sequelize' +import { literal, Op, QueryTypes, Transaction } from 'sequelize' import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript' import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc' import { CONSTRAINTS_FIELDS } from '../../initializers/constants' @@ -185,6 +185,17 @@ export class VideoShareModel extends Model { return VideoShareModel.findAndCountAll(query) } + static listRemoteShareUrlsOfLocalVideos () { + const query = `SELECT "videoShare".url FROM "videoShare" ` + + `INNER JOIN actor ON actor.id = "videoShare"."actorId" AND actor."serverId" IS NOT NULL ` + + `INNER JOIN video ON video.id = "videoShare"."videoId" AND video.remote IS FALSE` + + return VideoShareModel.sequelize.query<{ url: string }>(query, { + type: QueryTypes.SELECT, + raw: true + }).then(rows => rows.map(r => r.url)) + } + static cleanOldSharesOf (videoId: number, beforeUpdatedAt: Date) { const query = { where: { diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 8894843e0..b4c7da655 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -34,7 +34,7 @@ import { ModelCache } from '@server/models/model-cache' import { VideoFile } from '@shared/models/videos/video-file.model' import { ResultList, UserRight, VideoPrivacy, VideoState } from '../../../shared' import { VideoObject } from '../../../shared/models/activitypub/objects' -import { Video, VideoDetails } from '../../../shared/models/videos' +import { Video, VideoDetails, VideoRateType } from '../../../shared/models/videos' import { ThumbnailType } from '../../../shared/models/videos/thumbnail.type' import { VideoFilter } from '../../../shared/models/videos/video-query.type' import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type' @@ -1509,6 +1509,24 @@ export class VideoModel extends Model { }) } + static updateRatesOf (videoId: number, type: VideoRateType, t: Transaction) { + const field = type === 'like' + ? 'likes' + : 'dislikes' + + const rawQuery = `UPDATE "video" SET "${field}" = ` + + '(' + + 'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' + + ') ' + + 'WHERE "video"."id" = :videoId' + + return AccountVideoRateModel.sequelize.query(rawQuery, { + transaction: t, + replacements: { videoId, rateType: type }, + type: QueryTypes.UPDATE + }) + } + static checkVideoHasInstanceFollow (videoId: number, followerActorId: number) { // Instances only share videos const query = 'SELECT 1 FROM "videoShare" ' + diff --git a/server/tests/api/activitypub/cleaner.ts b/server/tests/api/activitypub/cleaner.ts new file mode 100644 index 000000000..75ef56ce3 --- /dev/null +++ b/server/tests/api/activitypub/cleaner.ts @@ -0,0 +1,283 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { + cleanupTests, + closeAllSequelize, + deleteAll, + doubleFollow, + getCount, + selectQuery, + setVideoField, + updateQuery, + wait +} from '../../../../shared/extra-utils' +import { flushAndRunMultipleServers, ServerInfo, setAccessTokensToServers } from '../../../../shared/extra-utils/index' +import { waitJobs } from '../../../../shared/extra-utils/server/jobs' +import { addVideoCommentThread, getVideoCommentThreads } from '../../../../shared/extra-utils/videos/video-comments' +import { getVideo, rateVideo, uploadVideoAndGetId } from '../../../../shared/extra-utils/videos/videos' + +const expect = chai.expect + +describe('Test AP cleaner', function () { + let servers: ServerInfo[] = [] + let videoUUID1: string + let videoUUID2: string + let videoUUID3: string + + let videoUUIDs: string[] + + before(async function () { + this.timeout(120000) + + const config = { + federation: { + videos: { cleanup_remote_interactions: true } + } + } + servers = await flushAndRunMultipleServers(3, config) + + // Get the access tokens + await setAccessTokensToServers(servers) + + await Promise.all([ + doubleFollow(servers[0], servers[1]), + doubleFollow(servers[1], servers[2]), + doubleFollow(servers[0], servers[2]) + ]) + + // Update 1 local share, check 6 shares + + // Create 1 comment per video + // Update 1 remote URL and 1 local URL on + + videoUUID1 = (await uploadVideoAndGetId({ server: servers[0], videoName: 'server 1' })).uuid + videoUUID2 = (await uploadVideoAndGetId({ server: servers[1], videoName: 'server 2' })).uuid + videoUUID3 = (await uploadVideoAndGetId({ server: servers[2], videoName: 'server 3' })).uuid + + videoUUIDs = [ videoUUID1, videoUUID2, videoUUID3 ] + + await waitJobs(servers) + + for (const server of servers) { + for (const uuid of videoUUIDs) { + await rateVideo(server.url, server.accessToken, uuid, 'like') + await addVideoCommentThread(server.url, server.accessToken, uuid, 'comment') + } + } + + await waitJobs(servers) + }) + + it('Should have the correct likes', async function () { + for (const server of servers) { + for (const uuid of videoUUIDs) { + const res = await getVideo(server.url, uuid) + expect(res.body.likes).to.equal(3) + expect(res.body.dislikes).to.equal(0) + } + } + }) + + it('Should destroy server 3 internal likes and correctly clean them', async function () { + this.timeout(20000) + + await deleteAll(servers[2].internalServerNumber, 'accountVideoRate') + for (const uuid of videoUUIDs) { + await setVideoField(servers[2].internalServerNumber, uuid, 'likes', '0') + } + + await wait(5000) + await waitJobs(servers) + + // Updated rates of my video + { + const res = await getVideo(servers[0].url, videoUUID1) + expect(res.body.likes).to.equal(2) + expect(res.body.dislikes).to.equal(0) + } + + // Did not update rates of a remote video + { + const res = await getVideo(servers[0].url, videoUUID2) + expect(res.body.likes).to.equal(3) + expect(res.body.dislikes).to.equal(0) + } + }) + + it('Should update rates to dislikes', async function () { + this.timeout(20000) + + for (const server of servers) { + for (const uuid of videoUUIDs) { + await rateVideo(server.url, server.accessToken, uuid, 'dislike') + } + } + + await waitJobs(servers) + + for (const server of servers) { + for (const uuid of videoUUIDs) { + const res = await getVideo(server.url, uuid) + expect(res.body.likes).to.equal(0) + expect(res.body.dislikes).to.equal(3) + } + } + }) + + it('Should destroy server 3 internal dislikes and correctly clean them', async function () { + this.timeout(20000) + + await deleteAll(servers[2].internalServerNumber, 'accountVideoRate') + + for (const uuid of videoUUIDs) { + await setVideoField(servers[2].internalServerNumber, uuid, 'dislikes', '0') + } + + await wait(5000) + await waitJobs(servers) + + // Updated rates of my video + { + const res = await getVideo(servers[0].url, videoUUID1) + expect(res.body.likes).to.equal(0) + expect(res.body.dislikes).to.equal(2) + } + + // Did not update rates of a remote video + { + const res = await getVideo(servers[0].url, videoUUID2) + expect(res.body.likes).to.equal(0) + expect(res.body.dislikes).to.equal(3) + } + }) + + it('Should destroy server 3 internal shares and correctly clean them', async function () { + this.timeout(20000) + + const preCount = await getCount(servers[0].internalServerNumber, 'videoShare') + expect(preCount).to.equal(6) + + await deleteAll(servers[2].internalServerNumber, 'videoShare') + await wait(5000) + await waitJobs(servers) + + // Still 6 because we don't have remote shares on local videos + const postCount = await getCount(servers[0].internalServerNumber, 'videoShare') + expect(postCount).to.equal(6) + }) + + it('Should destroy server 3 internal comments and correctly clean them', async function () { + this.timeout(20000) + + { + const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5) + expect(res.body.total).to.equal(3) + } + + await deleteAll(servers[2].internalServerNumber, 'videoComment') + + await wait(5000) + await waitJobs(servers) + + { + const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5) + expect(res.body.total).to.equal(2) + } + }) + + it('Should correctly update rate URLs', async function () { + this.timeout(30000) + + async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') { + const query = `SELECT "videoId", "accountVideoRate".url FROM "accountVideoRate" ` + + `INNER JOIN video ON "accountVideoRate"."videoId" = video.id AND remote IS ${remote} WHERE "accountVideoRate"."url" LIKE '${like}'` + const res = await selectQuery(servers[0].internalServerNumber, query) + + for (const rate of res) { + const matcher = new RegExp(`^${ofServerUrl}/accounts/root/dislikes/\\d+${urlSuffix}$`) + expect(rate.url).to.match(matcher) + } + } + + async function checkLocal () { + const startsWith = 'http://' + servers[0].host + '%' + // On local videos + await check(startsWith, servers[0].url, '', 'false') + // On remote videos + await check(startsWith, servers[0].url, '', 'true') + } + + async function checkRemote (suffix: string) { + const startsWith = 'http://' + servers[1].host + '%' + // On local videos + await check(startsWith, servers[1].url, suffix, 'false') + // On remote videos, we should not update URLs so no suffix + await check(startsWith, servers[1].url, '', 'true') + } + + await checkLocal() + await checkRemote('') + + { + const query = `UPDATE "accountVideoRate" SET url = url || 'stan'` + await updateQuery(servers[1].internalServerNumber, query) + + await wait(5000) + await waitJobs(servers) + } + + await checkLocal() + await checkRemote('stan') + }) + + it('Should correctly update comment URLs', async function () { + this.timeout(30000) + + async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') { + const query = `SELECT "videoId", "videoComment".url, uuid as "videoUUID" FROM "videoComment" ` + + `INNER JOIN video ON "videoComment"."videoId" = video.id AND remote IS ${remote} WHERE "videoComment"."url" LIKE '${like}'` + + const res = await selectQuery(servers[0].internalServerNumber, query) + + for (const comment of res) { + const matcher = new RegExp(`${ofServerUrl}/videos/watch/${comment.videoUUID}/comments/\\d+${urlSuffix}`) + expect(comment.url).to.match(matcher) + } + } + + async function checkLocal () { + const startsWith = 'http://' + servers[0].host + '%' + // On local videos + await check(startsWith, servers[0].url, '', 'false') + // On remote videos + await check(startsWith, servers[0].url, '', 'true') + } + + async function checkRemote (suffix: string) { + const startsWith = 'http://' + servers[1].host + '%' + // On local videos + await check(startsWith, servers[1].url, suffix, 'false') + // On remote videos, we should not update URLs so no suffix + await check(startsWith, servers[1].url, '', 'true') + } + + { + const query = `UPDATE "videoComment" SET url = url || 'kyle'` + await updateQuery(servers[1].internalServerNumber, query) + + await wait(5000) + await waitJobs(servers) + } + + await checkLocal() + await checkRemote('kyle') + }) + + after(async function () { + await cleanupTests(servers) + + await closeAllSequelize(servers) + }) +}) diff --git a/server/tests/api/activitypub/index.ts b/server/tests/api/activitypub/index.ts index 92bd6f660..324b444e4 100644 --- a/server/tests/api/activitypub/index.ts +++ b/server/tests/api/activitypub/index.ts @@ -1,3 +1,4 @@ +import './cleaner' import './client' import './fetch' import './refresher' diff --git a/shared/extra-utils/miscs/sql.ts b/shared/extra-utils/miscs/sql.ts index e68812e1b..740f0c2d6 100644 --- a/shared/extra-utils/miscs/sql.ts +++ b/shared/extra-utils/miscs/sql.ts @@ -24,6 +24,25 @@ function getSequelize (internalServerNumber: number) { return seq } +function deleteAll (internalServerNumber: number, table: string) { + const seq = getSequelize(internalServerNumber) + + const options = { type: QueryTypes.DELETE } + + return seq.query(`DELETE FROM "${table}"`, options) +} + +async function getCount (internalServerNumber: number, table: string) { + const seq = getSequelize(internalServerNumber) + + const options = { type: QueryTypes.SELECT as QueryTypes.SELECT } + + const [ { total } ] = await seq.query<{ total: string }>(`SELECT COUNT(*) as total FROM "${table}"`, options) + if (total === null) return 0 + + return parseInt(total, 10) +} + function setActorField (internalServerNumber: number, to: string, field: string, value: string) { const seq = getSequelize(internalServerNumber) @@ -63,6 +82,20 @@ async function countVideoViewsOf (internalServerNumber: number, uuid: string) { return parseInt(total + '', 10) } +function selectQuery (internalServerNumber: number, query: string) { + const seq = getSequelize(internalServerNumber) + const options = { type: QueryTypes.SELECT as QueryTypes.SELECT } + + return seq.query(query, options) +} + +function updateQuery (internalServerNumber: number, query: string) { + const seq = getSequelize(internalServerNumber) + const options = { type: QueryTypes.UPDATE as QueryTypes.UPDATE } + + return seq.query(query, options) +} + async function closeAllSequelize (servers: ServerInfo[]) { for (const server of servers) { if (sequelizes[server.internalServerNumber]) { @@ -95,6 +128,10 @@ export { setActorField, countVideoViewsOf, setPluginVersion, + selectQuery, + deleteAll, + updateQuery, setActorFollowScores, - closeAllSequelize + closeAllSequelize, + getCount } diff --git a/shared/extra-utils/server/jobs.ts b/shared/extra-utils/server/jobs.ts index 97971f960..704929bd4 100644 --- a/shared/extra-utils/server/jobs.ts +++ b/shared/extra-utils/server/jobs.ts @@ -63,6 +63,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { else servers = serversArg as ServerInfo[] const states: JobState[] = [ 'waiting', 'active', 'delayed' ] + const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] let pendingRequests: boolean function tasksBuilder () { @@ -79,7 +80,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { count: 10, sort: '-createdAt' }).then(res => res.body.data) - .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views')) + .then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type))) .then(jobs => { if (jobs.length !== 0) { pendingRequests = true diff --git a/shared/extra-utils/videos/videos.ts b/shared/extra-utils/videos/videos.ts index 0b6a54046..67fe82d41 100644 --- a/shared/extra-utils/videos/videos.ts +++ b/shared/extra-utils/videos/videos.ts @@ -498,7 +498,7 @@ function updateVideo ( }) } -function rateVideo (url: string, accessToken: string, id: number, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) { +function rateVideo (url: string, accessToken: string, id: number | string, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) { const path = '/api/v1/videos/' + id + '/rate' return request(url) diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index c693827b0..83ef84457 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -8,6 +8,7 @@ export type JobType = | 'activitypub-http-unicast' | 'activitypub-http-broadcast' | 'activitypub-http-fetcher' + | 'activitypub-cleaner' | 'activitypub-follow' | 'video-file-import' | 'video-transcoding'