diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 7816561fd..c899812a6 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -19,7 +19,7 @@ import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type' import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model' import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model' // Do not use barrels, remain constants as independent as possible -import { isTestInstance, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' +import { isTestInstance, parseDurationToMs, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' import { CONFIG, registerConfigChangedHandler } from './config' // --------------------------------------------------------------------------- @@ -200,8 +200,14 @@ const JOB_PRIORITY = { } const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job -const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) + +const AP_CLEANER = { + CONCURRENCY: 10, // How many requests in parallel we do in activitypub-cleaner job + UNAVAILABLE_TRESHOLD: 3, // How many attemps we do before removing an unavailable remote resource + PERIOD: parseDurationToMs('1 week') // /!\ Has to be sync with REPEAT_JOBS +} + const REQUEST_TIMEOUTS = { DEFAULT: 7000, // 7 seconds FILE: 30000, // 30 seconds @@ -796,8 +802,11 @@ if (isTestInstance() === true) { SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 + REPEAT_JOBS['videos-views-stats'] = { every: 5000 } + REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } + AP_CLEANER.PERIOD = 5000 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 @@ -858,7 +867,7 @@ export { REDUNDANCY, JOB_CONCURRENCY, JOB_ATTEMPTS, - AP_CLEANER_CONCURRENCY, + AP_CLEANER, LAST_MIGRATION_VERSION, OAUTH_LIFETIME, CUSTOM_HTML_TAG_COMMENTS, diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index d5e4508fe..1540bf23a 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -8,7 +8,8 @@ import { } from '@server/helpers/custom-validators/activitypub/activity' import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' -import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' +import { AP_CLEANER } from '@server/initializers/constants' +import { Redis } from '@server/lib/redis' import { VideoModel } from '@server/models/video/video' import { VideoCommentModel } from '@server/models/video/video-comment' import { VideoShareModel } from '@server/models/video/video-share' @@ -27,7 +28,7 @@ async function processActivityPubCleaner (_job: Job) { await map(rateUrls, async rateUrl => { try { - const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) + const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) if (result?.status === 'deleted') { const { videoId, type } = result.data @@ -37,7 +38,7 @@ async function processActivityPubCleaner (_job: Job) { } catch (err) { logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) } - }, { concurrency: AP_CLEANER_CONCURRENCY }) + }, { concurrency: AP_CLEANER.CONCURRENCY }) } { @@ -46,11 +47,11 @@ async function processActivityPubCleaner (_job: Job) { await map(shareUrls, async shareUrl => { try { - await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) + await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) } catch (err) { logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) } - }, { concurrency: AP_CLEANER_CONCURRENCY }) + }, { concurrency: AP_CLEANER.CONCURRENCY }) } { @@ -59,11 +60,11 @@ async function processActivityPubCleaner (_job: Job) { await map(commentUrls, async commentUrl => { try { - await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) + await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) } catch (err) { logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) } - }, { concurrency: AP_CLEANER_CONCURRENCY }) + }, { concurrency: AP_CLEANER.CONCURRENCY }) } } @@ -75,12 +76,14 @@ export { // --------------------------------------------------------------------------- -async function updateObjectIfNeeded ( - url: string, - bodyValidator: (body: any) => boolean, - updater: (url: string, newUrl: string) => Promise, - deleter: (url: string) => Promise +async function updateObjectIfNeeded (options: { + url: string + bodyValidator: (body: any) => boolean + updater: (url: string, newUrl: string) => Promise + deleter: (url: string) => Promise } ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { + const { url, bodyValidator, updater, deleter } = options + const on404OrTombstone = async () => { logger.info('Removing remote AP object %s.', url) const data = await deleter(url) @@ -117,7 +120,15 @@ async function updateObjectIfNeeded ( return on404OrTombstone() } - throw err + logger.debug('Remote AP object %s is unavailable.', url) + + const unavailability = await Redis.Instance.addAPUnavailability(url) + if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { + logger.info('Removing unavailable AP resource %s.', url) + return on404OrTombstone() + } + + return null } } diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 9ef9d7702..4dcbcddb5 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -1,9 +1,11 @@ import { createClient } from 'redis' import { exists } from '@server/helpers/custom-validators/misc' +import { sha256 } from '@shared/extra-utils' import { logger } from '../helpers/logger' import { generateRandomString } from '../helpers/utils' import { CONFIG } from '../initializers/config' import { + AP_CLEANER, CONTACT_FORM_LIFETIME, RESUMABLE_UPLOAD_SESSION_LIFETIME, TRACKER_RATE_LIMITS, @@ -260,6 +262,17 @@ class Redis { return this.deleteKey('resumable-upload-' + uploadId) } + /* ************ AP ressource unavailability ************ */ + + async addAPUnavailability (url: string) { + const key = this.generateAPUnavailabilityKey(url) + + const value = await this.increment(key) + await this.setExpiration(key, AP_CLEANER.PERIOD * 2) + + return value + } + /* ************ Keys generation ************ */ private generateLocalVideoViewsKeys (videoId?: Number) { @@ -298,6 +311,10 @@ class Redis { return 'contact-form-' + ip } + private generateAPUnavailabilityKey (url: string) { + return 'ap-unavailability-' + sha256(url) + } + /* ************ Redis helpers ************ */ private getValue (key: string) { @@ -330,10 +347,6 @@ class Redis { return this.client.del(this.prefix + key) } - private getObject (key: string) { - return this.client.hGetAll(this.prefix + key) - } - private increment (key: string) { return this.client.incr(this.prefix + key) } @@ -342,6 +355,10 @@ class Redis { return this.client.exists(this.prefix + key) } + private setExpiration (key: string, ms: number) { + return this.client.expire(this.prefix + key, ms / 1000) + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/tests/api/activitypub/cleaner.ts b/server/tests/api/activitypub/cleaner.ts index 7a443b553..dc885023a 100644 --- a/server/tests/api/activitypub/cleaner.ts +++ b/server/tests/api/activitypub/cleaner.ts @@ -270,6 +270,59 @@ describe('Test AP cleaner', function () { await checkRemote('kyle') }) + it('Should remove unavailable remote resources', async function () { + this.timeout(240000) + + async function expectNotDeleted () { + { + const video = await servers[0].videos.get({ id: uuid }) + + expect(video.likes).to.equal(3) + expect(video.dislikes).to.equal(0) + } + + { + const { total } = await servers[0].comments.listThreads({ videoId: uuid }) + expect(total).to.equal(3) + } + } + + async function expectDeleted () { + { + const video = await servers[0].videos.get({ id: uuid }) + + expect(video.likes).to.equal(3) + expect(video.dislikes).to.equal(0) + } + + { + const { total } = await servers[0].comments.listThreads({ videoId: videoUUID1 }) + expect(total).to.equal(2) + } + } + + const uuid = (await servers[0].videos.quickUpload({ name: 'server 1 video 2' })).uuid + + await waitJobs(servers) + + for (const server of servers) { + await server.videos.rate({ id: uuid, rating: 'like' }) + await server.comments.createThread({ videoId: uuid, text: 'comment' }) + } + + await waitJobs(servers) + + await expectNotDeleted() + + await servers[1].kill() + + await wait(5000) + await expectNotDeleted() + + await wait(15000) + await expectDeleted() + }) + after(async function () { await cleanupTests(servers) })