Cleanup unavailable remote AP resource

This commit is contained in:
Chocobozzz 2021-12-28 11:36:51 +01:00
parent 21d68e6803
commit f1569117f9
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
4 changed files with 110 additions and 20 deletions

View File

@ -19,7 +19,7 @@ import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type'
import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model' import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model'
import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model' import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model'
// Do not use barrels, remain constants as independent as possible // Do not use barrels, remain constants as independent as possible
import { isTestInstance, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' import { isTestInstance, parseDurationToMs, sanitizeHost, sanitizeUrl } from '../helpers/core-utils'
import { CONFIG, registerConfigChangedHandler } from './config' import { CONFIG, registerConfigChangedHandler } from './config'
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -200,8 +200,14 @@ const JOB_PRIORITY = {
} }
const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job
const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job
const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...)
const AP_CLEANER = {
CONCURRENCY: 10, // How many requests in parallel we do in activitypub-cleaner job
UNAVAILABLE_TRESHOLD: 3, // How many attemps we do before removing an unavailable remote resource
PERIOD: parseDurationToMs('1 week') // /!\ Has to be sync with REPEAT_JOBS
}
const REQUEST_TIMEOUTS = { const REQUEST_TIMEOUTS = {
DEFAULT: 7000, // 7 seconds DEFAULT: 7000, // 7 seconds
FILE: 30000, // 30 seconds FILE: 30000, // 30 seconds
@ -796,8 +802,11 @@ if (isTestInstance() === true) {
SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
REPEAT_JOBS['videos-views-stats'] = { every: 5000 } REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
AP_CLEANER.PERIOD = 5000
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
@ -858,7 +867,7 @@ export {
REDUNDANCY, REDUNDANCY,
JOB_CONCURRENCY, JOB_CONCURRENCY,
JOB_ATTEMPTS, JOB_ATTEMPTS,
AP_CLEANER_CONCURRENCY, AP_CLEANER,
LAST_MIGRATION_VERSION, LAST_MIGRATION_VERSION,
OAUTH_LIFETIME, OAUTH_LIFETIME,
CUSTOM_HTML_TAG_COMMENTS, CUSTOM_HTML_TAG_COMMENTS,

View File

@ -8,7 +8,8 @@ import {
} from '@server/helpers/custom-validators/activitypub/activity' } from '@server/helpers/custom-validators/activitypub/activity'
import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' import { AP_CLEANER } from '@server/initializers/constants'
import { Redis } from '@server/lib/redis'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoCommentModel } from '@server/models/video/video-comment' import { VideoCommentModel } from '@server/models/video/video-comment'
import { VideoShareModel } from '@server/models/video/video-share' import { VideoShareModel } from '@server/models/video/video-share'
@ -27,7 +28,7 @@ async function processActivityPubCleaner (_job: Job) {
await map(rateUrls, async rateUrl => { await map(rateUrls, async rateUrl => {
try { try {
const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
if (result?.status === 'deleted') { if (result?.status === 'deleted') {
const { videoId, type } = result.data const { videoId, type } = result.data
@ -37,7 +38,7 @@ async function processActivityPubCleaner (_job: Job) {
} catch (err) { } catch (err) {
logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
} }
}, { concurrency: AP_CLEANER_CONCURRENCY }) }, { concurrency: AP_CLEANER.CONCURRENCY })
} }
{ {
@ -46,11 +47,11 @@ async function processActivityPubCleaner (_job: Job) {
await map(shareUrls, async shareUrl => { await map(shareUrls, async shareUrl => {
try { try {
await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
} catch (err) { } catch (err) {
logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
} }
}, { concurrency: AP_CLEANER_CONCURRENCY }) }, { concurrency: AP_CLEANER.CONCURRENCY })
} }
{ {
@ -59,11 +60,11 @@ async function processActivityPubCleaner (_job: Job) {
await map(commentUrls, async commentUrl => { await map(commentUrls, async commentUrl => {
try { try {
await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
} catch (err) { } catch (err) {
logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
} }
}, { concurrency: AP_CLEANER_CONCURRENCY }) }, { concurrency: AP_CLEANER.CONCURRENCY })
} }
} }
@ -75,12 +76,14 @@ export {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async function updateObjectIfNeeded <T> ( async function updateObjectIfNeeded <T> (options: {
url: string, url: string
bodyValidator: (body: any) => boolean, bodyValidator: (body: any) => boolean
updater: (url: string, newUrl: string) => Promise<T>, updater: (url: string, newUrl: string) => Promise<T>
deleter: (url: string) => Promise<T> deleter: (url: string) => Promise<T> }
): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
const { url, bodyValidator, updater, deleter } = options
const on404OrTombstone = async () => { const on404OrTombstone = async () => {
logger.info('Removing remote AP object %s.', url) logger.info('Removing remote AP object %s.', url)
const data = await deleter(url) const data = await deleter(url)
@ -117,7 +120,15 @@ async function updateObjectIfNeeded <T> (
return on404OrTombstone() return on404OrTombstone()
} }
throw err logger.debug('Remote AP object %s is unavailable.', url)
const unavailability = await Redis.Instance.addAPUnavailability(url)
if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
logger.info('Removing unavailable AP resource %s.', url)
return on404OrTombstone()
}
return null
} }
} }

View File

@ -1,9 +1,11 @@
import { createClient } from 'redis' import { createClient } from 'redis'
import { exists } from '@server/helpers/custom-validators/misc' import { exists } from '@server/helpers/custom-validators/misc'
import { sha256 } from '@shared/extra-utils'
import { logger } from '../helpers/logger' import { logger } from '../helpers/logger'
import { generateRandomString } from '../helpers/utils' import { generateRandomString } from '../helpers/utils'
import { CONFIG } from '../initializers/config' import { CONFIG } from '../initializers/config'
import { import {
AP_CLEANER,
CONTACT_FORM_LIFETIME, CONTACT_FORM_LIFETIME,
RESUMABLE_UPLOAD_SESSION_LIFETIME, RESUMABLE_UPLOAD_SESSION_LIFETIME,
TRACKER_RATE_LIMITS, TRACKER_RATE_LIMITS,
@ -260,6 +262,17 @@ class Redis {
return this.deleteKey('resumable-upload-' + uploadId) return this.deleteKey('resumable-upload-' + uploadId)
} }
/* ************ AP ressource unavailability ************ */
async addAPUnavailability (url: string) {
const key = this.generateAPUnavailabilityKey(url)
const value = await this.increment(key)
await this.setExpiration(key, AP_CLEANER.PERIOD * 2)
return value
}
/* ************ Keys generation ************ */ /* ************ Keys generation ************ */
private generateLocalVideoViewsKeys (videoId?: Number) { private generateLocalVideoViewsKeys (videoId?: Number) {
@ -298,6 +311,10 @@ class Redis {
return 'contact-form-' + ip return 'contact-form-' + ip
} }
private generateAPUnavailabilityKey (url: string) {
return 'ap-unavailability-' + sha256(url)
}
/* ************ Redis helpers ************ */ /* ************ Redis helpers ************ */
private getValue (key: string) { private getValue (key: string) {
@ -330,10 +347,6 @@ class Redis {
return this.client.del(this.prefix + key) return this.client.del(this.prefix + key)
} }
private getObject (key: string) {
return this.client.hGetAll(this.prefix + key)
}
private increment (key: string) { private increment (key: string) {
return this.client.incr(this.prefix + key) return this.client.incr(this.prefix + key)
} }
@ -342,6 +355,10 @@ class Redis {
return this.client.exists(this.prefix + key) return this.client.exists(this.prefix + key)
} }
private setExpiration (key: string, ms: number) {
return this.client.expire(this.prefix + key, ms / 1000)
}
static get Instance () { static get Instance () {
return this.instance || (this.instance = new this()) return this.instance || (this.instance = new this())
} }

View File

@ -270,6 +270,59 @@ describe('Test AP cleaner', function () {
await checkRemote('kyle') await checkRemote('kyle')
}) })
it('Should remove unavailable remote resources', async function () {
this.timeout(240000)
async function expectNotDeleted () {
{
const video = await servers[0].videos.get({ id: uuid })
expect(video.likes).to.equal(3)
expect(video.dislikes).to.equal(0)
}
{
const { total } = await servers[0].comments.listThreads({ videoId: uuid })
expect(total).to.equal(3)
}
}
async function expectDeleted () {
{
const video = await servers[0].videos.get({ id: uuid })
expect(video.likes).to.equal(3)
expect(video.dislikes).to.equal(0)
}
{
const { total } = await servers[0].comments.listThreads({ videoId: videoUUID1 })
expect(total).to.equal(2)
}
}
const uuid = (await servers[0].videos.quickUpload({ name: 'server 1 video 2' })).uuid
await waitJobs(servers)
for (const server of servers) {
await server.videos.rate({ id: uuid, rating: 'like' })
await server.comments.createThread({ videoId: uuid, text: 'comment' })
}
await waitJobs(servers)
await expectNotDeleted()
await servers[1].kill()
await wait(5000)
await expectNotDeleted()
await wait(15000)
await expectDeleted()
})
after(async function () { after(async function () {
await cleanupTests(servers) await cleanupTests(servers)
}) })