From 276250f0a36e00373166d91d539e5220d6f158c7 Mon Sep 17 00:00:00 2001 From: Rigel Kent Date: Mon, 25 Oct 2021 17:42:20 +0200 Subject: [PATCH] prevent multiple post-process triggering of upload-resumable (#4175) * prevent multiple post-process triggering of upload-resumable * switch from 409 to 503 for upload being processed * Improve resumable upload check Co-authored-by: Chocobozzz --- .../video-upload.component.ts | 7 +++-- server/controllers/api/videos/upload.ts | 18 ++++++++---- server/helpers/upload.ts | 9 +++++- server/initializers/constants.ts | 3 ++ server/lib/job-queue/job-queue.ts | 2 ++ server/lib/redis.ts | 27 +++++++++++++++++- .../middlewares/validators/videos/videos.ts | 28 +++++++++++++++++-- server/tests/api/videos/resumable-upload.ts | 15 ++++++++++ shared/models/server/job.model.ts | 4 +++ support/doc/api/openapi.yaml | 7 +++++ 10 files changed, 107 insertions(+), 13 deletions(-) diff --git a/client/src/app/+videos/+video-edit/video-add-components/video-upload.component.ts b/client/src/app/+videos/+video-edit/video-add-components/video-upload.component.ts index 6f72a07c4..91d89a535 100644 --- a/client/src/app/+videos/+video-edit/video-add-components/video-upload.component.ts +++ b/client/src/app/+videos/+video-edit/video-add-components/video-upload.component.ts @@ -82,9 +82,10 @@ export class VideoUploadComponent extends VideoSend implements OnInit, OnDestroy uploaderClass: UploaderXFormData, chunkSize, retryConfig: { - maxAttempts: 6, - shouldRetry: (code: number) => { - return code < 400 || code >= 501 + maxAttempts: 30, // maximum attempts for 503 codes, otherwise set to 6, see below + maxDelay: 120_000, // 2 min + shouldRetry: (code: number, attempts: number) => { + return code === HttpStatusCode.SERVICE_UNAVAILABLE_503 || ((code < 400 || code > 500) && attempts < 6) } } } diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 55cb9cf20..02aadd426 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts @@ -7,6 +7,7 @@ import { uuidToShort } from '@server/helpers/uuid' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { generateWebTorrentVideoFilename } from '@server/lib/paths' +import { Redis } from '@server/lib/redis' import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob, @@ -94,7 +95,7 @@ uploadRouter.delete('/upload-resumable', uploadRouter.put('/upload-resumable', openapiOperationDoc({ operationId: 'uploadResumable' }), authenticate, - uploadxMiddleware, // uploadx doesn't use call next() before the file upload completes + uploadxMiddleware, // uploadx doesn't next() before the file upload completes asyncMiddleware(videosAddResumableValidator), asyncMiddleware(addVideoResumable) ) @@ -122,15 +123,20 @@ export async function addVideoLegacy (req: express.Request, res: express.Respons const videoInfo: VideoCreate = req.body const files = req.files - return addVideo({ res, videoPhysicalFile, videoInfo, files }) + const response = await addVideo({ res, videoPhysicalFile, videoInfo, files }) + + return res.json(response) } -export async function addVideoResumable (_req: express.Request, res: express.Response) { +export async function addVideoResumable (req: express.Request, res: express.Response) { const videoPhysicalFile = res.locals.videoFileResumable const videoInfo = videoPhysicalFile.metadata const files = { previewfile: videoInfo.previewfile } - return addVideo({ res, videoPhysicalFile, videoInfo, files }) + const response = await addVideo({ res, videoPhysicalFile, videoInfo, files }) + await Redis.Instance.setUploadSession(req.query.upload_id, response) + + return res.json(response) } async function addVideo (options: { @@ -225,13 +231,13 @@ async function addVideo (options: { Hooks.runAction('action:api.video.uploaded', { video: videoCreated }) - return res.json({ + return { video: { id: videoCreated.id, shortUUID: uuidToShort(videoCreated.uuid), uuid: videoCreated.uuid } - }) + } } async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { diff --git a/server/helpers/upload.ts b/server/helpers/upload.ts index 3cb17edd0..c94c7ab82 100644 --- a/server/helpers/upload.ts +++ b/server/helpers/upload.ts @@ -1,4 +1,5 @@ import { join } from 'path' +import { JobQueue } from '@server/lib/job-queue' import { RESUMABLE_UPLOAD_DIRECTORY } from '../initializers/constants' function getResumableUploadPath (filename?: string) { @@ -7,8 +8,14 @@ function getResumableUploadPath (filename?: string) { return RESUMABLE_UPLOAD_DIRECTORY } +function scheduleDeleteResumableUploadMetaFile (filepath: string) { + const payload = { filepath } + JobQueue.Instance.createJob({ type: 'delete-resumable-upload-meta-file', payload }, { delay: 900 * 1000 }) // executed in 15 min +} + // --------------------------------------------------------------------------- export { - getResumableUploadPath + getResumableUploadPath, + scheduleDeleteResumableUploadMetaFile } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 87a74a32c..f6c19dab4 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -665,6 +665,8 @@ const RESUMABLE_UPLOAD_DIRECTORY = join(CONFIG.STORAGE.TMP_DIR, 'resumable-uploa const HLS_STREAMING_PLAYLIST_DIRECTORY = join(CONFIG.STORAGE.STREAMING_PLAYLISTS_DIR, 'hls') const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls') +const RESUMABLE_UPLOAD_SESSION_LIFETIME = SCHEDULER_INTERVALS_MS.REMOVE_DANGLING_RESUMABLE_UPLOADS + const VIDEO_LIVE = { EXTENSION: '.ts', CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes @@ -838,6 +840,7 @@ export { LAZY_STATIC_PATHS, SEARCH_INDEX, RESUMABLE_UPLOAD_DIRECTORY, + RESUMABLE_UPLOAD_SESSION_LIFETIME, HLS_REDUNDANCY_DIRECTORY, P2P_MEDIA_LOADER_PEER_VERSION, ACTOR_IMAGES_SIZE, diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 4cda12b57..53d6b6a9c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -8,6 +8,7 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + DeleteResumableUploadMetaFilePayload, EmailPayload, JobState, JobType, @@ -52,6 +53,7 @@ type CreateJobArgument = { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'actor-keys', payload: ActorKeysPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } | + { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } export type CreateJobOptions = { diff --git a/server/lib/redis.ts b/server/lib/redis.ts index d1d88d853..46617b07e 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -9,7 +9,8 @@ import { USER_PASSWORD_CREATE_LIFETIME, VIEW_LIFETIME, WEBSERVER, - TRACKER_RATE_LIMITS + TRACKER_RATE_LIMITS, + RESUMABLE_UPLOAD_SESSION_LIFETIME } from '../initializers/constants' import { CONFIG } from '../initializers/config' @@ -202,6 +203,30 @@ class Redis { ]) } + /* ************ Resumable uploads final responses ************ */ + + setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) { + return this.setValue( + 'resumable-upload-' + uploadId, + response + ? JSON.stringify(response) + : '', + RESUMABLE_UPLOAD_SESSION_LIFETIME + ) + } + + doesUploadSessionExist (uploadId: string) { + return this.exists('resumable-upload-' + uploadId) + } + + async getUploadSession (uploadId: string) { + const value = await this.getValue('resumable-upload-' + uploadId) + + return value + ? JSON.parse(value) + : '' + } + /* ************ Keys generation ************ */ generateCachedRouteKey (req: express.Request) { diff --git a/server/middlewares/validators/videos/videos.ts b/server/middlewares/validators/videos/videos.ts index 23ee9778a..e486887a7 100644 --- a/server/middlewares/validators/videos/videos.ts +++ b/server/middlewares/validators/videos/videos.ts @@ -1,6 +1,8 @@ import express from 'express' import { body, header, param, query, ValidationChain } from 'express-validator' +import { isTestInstance } from '@server/helpers/core-utils' import { getResumableUploadPath } from '@server/helpers/upload' +import { Redis } from '@server/lib/redis' import { isAbleToUploadVideo } from '@server/lib/user' import { getServerActor } from '@server/models/application/application' import { ExpressPromiseHandler } from '@server/types/express' @@ -105,12 +107,34 @@ const videosAddLegacyValidator = getCommonVideoEditAttributes().concat([ const videosAddResumableValidator = [ async (req: express.Request, res: express.Response, next: express.NextFunction) => { const user = res.locals.oauth.token.User - const body: express.CustomUploadXFile = req.body const file = { ...body, duration: undefined, path: getResumableUploadPath(body.id), filename: body.metadata.filename } - const cleanup = () => deleteFileAndCatch(file.path) + const uploadId = req.query.upload_id + const sessionExists = await Redis.Instance.doesUploadSessionExist(uploadId) + + if (sessionExists) { + const sessionResponse = await Redis.Instance.getUploadSession(uploadId) + + if (!sessionResponse) { + res.setHeader('Retry-After', 300) // ask to retry after 5 min, knowing the upload_id is kept for up to 15 min after completion + + return res.fail({ + status: HttpStatusCode.SERVICE_UNAVAILABLE_503, + message: 'The upload is already being processed' + }) + } + + if (isTestInstance()) { + res.setHeader('x-resumable-upload-cached', 'true') + } + + return res.json(sessionResponse) + } + + await Redis.Instance.setUploadSession(uploadId) + if (!await doesVideoChannelOfAccountExist(file.metadata.channelId, user, res)) return cleanup() try { diff --git a/server/tests/api/videos/resumable-upload.ts b/server/tests/api/videos/resumable-upload.ts index 59970aa94..6b5e0c09d 100644 --- a/server/tests/api/videos/resumable-upload.ts +++ b/server/tests/api/videos/resumable-upload.ts @@ -180,6 +180,21 @@ describe('Test resumable upload', function () { await sendChunks({ pathUploadId: uploadId, expectedStatus, contentRangeBuilder, contentLength: size }) await checkFileSize(uploadId, 0) }) + + it('Should be able to accept 2 PUT requests', async function () { + const uploadId = await prepareUpload() + + const result1 = await sendChunks({ pathUploadId: uploadId }) + const result2 = await sendChunks({ pathUploadId: uploadId }) + + expect(result1.body.video.uuid).to.exist + expect(result1.body.video.uuid).to.equal(result2.body.video.uuid) + + expect(result1.headers['x-resumable-upload-cached']).to.not.exist + expect(result2.headers['x-resumable-upload-cached']).to.equal('true') + + await checkFileSize(uploadId, null) + }) }) after(async function () { diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index ff96283a4..12e0fcf85 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -138,6 +138,10 @@ export interface ActorKeysPayload { actorId: number } +export interface DeleteResumableUploadMetaFilePayload { + filepath: string +} + export interface MoveObjectStoragePayload { videoUUID: string isNewVideo: boolean diff --git a/support/doc/api/openapi.yaml b/support/doc/api/openapi.yaml index d6f8c1ae0..ef4e7d04d 100644 --- a/support/doc/api/openapi.yaml +++ b/support/doc/api/openapi.yaml @@ -2081,6 +2081,13 @@ paths: description: video unreadable '429': description: too many concurrent requests + '503': + description: upload is already being processed + headers: + 'Retry-After': + schema: + type: number + example: 300 delete: summary: Cancel the resumable upload of a video, deleting any data uploaded so far description: Uses [a resumable protocol](https://github.com/kukhariev/node-uploadx/blob/master/proto.md) to cancel the upload of a video