From 156cdbac227b5501116a66063812b9fc9b0e89c7 Mon Sep 17 00:00:00 2001 From: kontrollanten <6680299+kontrollanten@users.noreply.github.com> Date: Tue, 19 Apr 2022 15:22:18 +0200 Subject: [PATCH] object-storage: @aws-sdk/lib-storage for multipart (#4903) * object-storage: @aws-sdk/lib-storage for multipart * gitignore: add .DS_Store * test(object-storage): remove only * test(object-storage/multipart): generate video * fix lint issue * test(obj-storage/video): ensure file size * Styling Co-authored-by: Chocobozzz --- .gitignore | 1 + package.json | 3 +- .../shared/object-storage-helpers.ts | 89 ++++--------------- server/tests/api/object-storage/videos.ts | 42 +++++++-- yarn.lock | 43 +++++++-- 5 files changed, 92 insertions(+), 86 deletions(-) diff --git a/.gitignore b/.gitignore index 5e06248f1..c6029ad65 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ yarn-error.log /*.zip /*.tar.xz /*.asc +*.DS_Store /server/tools/import-mediacore.ts /docker-volume/ /init.mp4 diff --git a/package.json b/package.json index 97438afdb..360bd781f 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,8 @@ }, "dependencies": { "@aws-sdk/client-s3": "^3.23.0", - "@babel/parser": "7.17.9", + "@aws-sdk/lib-storage": "^3.72.0", + "@babel/parser": "7.17.8", "@peertube/feed": "^5.0.1", "@peertube/http-signature": "^1.4.0", "@uploadx/core": "^5.1.0", diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index ecb82856e..a2de92532 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts @@ -1,19 +1,14 @@ -import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' -import { min } from 'lodash' +import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' import { dirname } from 'path' import { Readable } from 'stream' import { - CompletedPart, - CompleteMultipartUploadCommand, - CreateMultipartUploadCommand, - CreateMultipartUploadCommandInput, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, - PutObjectCommandInput, - UploadPartCommand + PutObjectCommandInput } from '@aws-sdk/client-s3' +import { Upload } from '@aws-sdk/lib-storage' import { pipelinePromise } from '@server/helpers/core-utils' import { isArray } from '@server/helpers/custom-validators/misc' import { logger } from '@server/helpers/logger' @@ -37,13 +32,12 @@ async function storeObject (options: { logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) const stats = await stat(inputPath) + const fileStream = createReadStream(inputPath) - // If bigger than max allowed size we do a multipart upload if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { - return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) + return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) } - const fileStream = createReadStream(inputPath) return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) } @@ -163,18 +157,14 @@ async function objectStoragePut (options: { } async function multiPartUpload (options: { - inputPath: string + content: ReadStream objectStorageKey: string bucketInfo: BucketInfo }) { - const { objectStorageKey, inputPath, bucketInfo } = options + const { content, objectStorageKey, bucketInfo } = options - const key = buildKey(objectStorageKey, bucketInfo) - const s3Client = getClient() - - const statResult = await stat(inputPath) - - const input: CreateMultipartUploadCommandInput = { + const input: PutObjectCommandInput = { + Body: content, Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(objectStorageKey, bucketInfo) } @@ -183,60 +173,19 @@ async function multiPartUpload (options: { input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL } - const createMultipartCommand = new CreateMultipartUploadCommand(input) - const createResponse = await s3Client.send(createMultipartCommand) - - const fd = await open(inputPath, 'r') - let partNumber = 1 - const parts: CompletedPart[] = [] - const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART - - for (let start = 0; start < statResult.size; start += partSize) { - logger.debug( - 'Uploading part %d of file to %s%s in bucket %s', - partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() - ) - - // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released - // The s3 sdk needs to know the length of the http body beforehand, but doesn't support - // streams with start and end set, so it just tries to stat the file in stream.path. - // This fails for us because we only want to send part of the file. The stream type - // is modified so we can set the byteLength here, which s3 detects because array buffers - // have this field set - const stream: ReadStream & { byteLength: number } = - createReadStream( - inputPath, - { fd, autoClose: false, start, end: (start + partSize) - 1 } - ) as ReadStream & { byteLength: number } - - // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength - stream.byteLength = min([ statResult.size - start, partSize ]) - - const uploadPartCommand = new UploadPartCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: key, - UploadId: createResponse.UploadId, - PartNumber: partNumber, - Body: stream - }) - const uploadResponse = await s3Client.send(uploadPartCommand) - - parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) - partNumber += 1 - } - await close(fd) - - const completeUploadCommand = new CompleteMultipartUploadCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: key, - UploadId: createResponse.UploadId, - MultipartUpload: { Parts: parts } + const parallelUploads3 = new Upload({ + client: getClient(), + queueSize: 4, + partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, + leavePartsOnError: false, + params: input }) - await s3Client.send(completeUploadCommand) + + await parallelUploads3.done() logger.debug( - 'Completed %s%s in bucket %s in %d parts', - bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() + 'Completed %s%s in bucket %s', + bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() ) return getPrivateUrl(bucketInfo, objectStorageKey) diff --git a/server/tests/api/object-storage/videos.ts b/server/tests/api/object-storage/videos.ts index 498efcb17..22ad06305 100644 --- a/server/tests/api/object-storage/videos.ts +++ b/server/tests/api/object-storage/videos.ts @@ -1,9 +1,17 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ import 'mocha' +import bytes from 'bytes' import * as chai from 'chai' +import { stat } from 'fs-extra' import { merge } from 'lodash' -import { checkTmpIsEmpty, expectLogDoesNotContain, expectStartWith, MockObjectStorage } from '@server/tests/shared' +import { + checkTmpIsEmpty, + expectLogDoesNotContain, + expectStartWith, + generateHighBitrateVideo, + MockObjectStorage +} from '@server/tests/shared' import { areObjectStorageTestsDisabled } from '@shared/core-utils' import { HttpStatusCode, VideoDetails } from '@shared/models' import { @@ -107,6 +115,10 @@ async function checkFiles (options: { } function runTestSuite (options: { + fixture?: string + + maxUploadPart?: string + playlistBucket: string playlistPrefix?: string @@ -114,10 +126,9 @@ function runTestSuite (options: { webtorrentPrefix?: string useMockBaseUrl?: boolean - - maxUploadPart?: string }) { const mockObjectStorage = new MockObjectStorage() + const { fixture } = options let baseMockUrl: string let servers: PeerTubeServer[] @@ -144,7 +155,7 @@ function runTestSuite (options: { credentials: ObjectStorageCommand.getCredentialsConfig(), - max_upload_part: options.maxUploadPart || '2MB', + max_upload_part: options.maxUploadPart || '5MB', streaming_playlists: { bucket_name: options.playlistBucket, @@ -181,7 +192,7 @@ function runTestSuite (options: { it('Should upload a video and move it to the object storage without transcoding', async function () { this.timeout(40000) - const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1' }) + const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1', fixture }) uuidsToDelete.push(uuid) await waitJobs(servers) @@ -197,7 +208,7 @@ function runTestSuite (options: { it('Should upload a video and move it to the object storage with transcoding', async function () { this.timeout(120000) - const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2' }) + const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2', fixture }) uuidsToDelete.push(uuid) await waitJobs(servers) @@ -390,12 +401,25 @@ describe('Object storage for videos', function () { }) }) - describe('Test object storage with small upload part', function () { + describe('Test object storage with file bigger than upload part', function () { + let fixture: string + const maxUploadPart = '5MB' + + before(async function () { + fixture = await generateHighBitrateVideo() + + const { size } = await stat(fixture) + + if (bytes.parse(maxUploadPart) > size) { + throw Error(`Fixture file is too small (${size}) to make sense for this test.`) + } + }) + runTestSuite({ + maxUploadPart, playlistBucket: 'streaming-playlists', webtorrentBucket: 'videos', - - maxUploadPart: '5KB' + fixture }) }) }) diff --git a/yarn.lock b/yarn.lock index a2e5def80..104f4c241 100644 --- a/yarn.lock +++ b/yarn.lock @@ -484,6 +484,16 @@ dependencies: tslib "^2.3.1" +"@aws-sdk/lib-storage@^3.72.0": + version "3.72.0" + resolved "https://registry.yarnpkg.com/@aws-sdk/lib-storage/-/lib-storage-3.72.0.tgz#035c577e306d6472aa5cb15220936262cb394763" + integrity sha512-z2L//IMN9fkXMhFyC0F9SXTH0oHA7zsOsLOyQS2hqKXAE3TGTK6d0hj6vmut4RH0wGzXOQ9zrh0DexAVdv29pA== + dependencies: + buffer "5.6.0" + events "3.3.0" + stream-browserify "3.0.0" + tslib "^2.3.1" + "@aws-sdk/md5-js@3.58.0": version "3.58.0" resolved "https://registry.yarnpkg.com/@aws-sdk/md5-js/-/md5-js-3.58.0.tgz#a7ecf5cc8a81ce247fd620f8c981802d0427737f" @@ -989,7 +999,12 @@ resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.16.4.tgz#d5f92f57cf2c74ffe9b37981c0e72fee7311372e" integrity sha512-6V0qdPUaiVHH3RtZeLIsc+6pDhbYzHR8ogA8w+f+Wc77DuXto19g2QUwveINoS34Uw+W8/hQDGJCx+i4n7xcng== -"@babel/parser@7.17.9", "@babel/parser@^7.16.4", "@babel/parser@^7.16.7", "@babel/parser@^7.17.9", "@babel/parser@^7.6.0", "@babel/parser@^7.9.6": +"@babel/parser@7.17.8": + version "7.17.8" + resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.17.8.tgz#2817fb9d885dd8132ea0f8eb615a6388cca1c240" + integrity sha512-BoHhDJrJXqcg+ZL16Xv39H9n+AqJ4pcDrQBGZN+wHxIysrLZ3/ECwCBUch/1zUNhnsXULcONU3Ei5Hmkfk6kiQ== + +"@babel/parser@^7.16.4", "@babel/parser@^7.16.7", "@babel/parser@^7.17.9", "@babel/parser@^7.6.0", "@babel/parser@^7.9.6": version "7.17.9" resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.17.9.tgz#9c94189a6062f0291418ca021077983058e171ef" integrity sha512-vqUSBLP8dQHFPdPi9bc5GK9vRkYHJ49fsZdtoJ8EQ8ibpwk5rPKfvNIwChB0KVXcIjcepEBBd2VHC5r9Gy8ueg== @@ -2555,7 +2570,7 @@ base32.js@0.1.0: resolved "https://registry.yarnpkg.com/base32.js/-/base32.js-0.1.0.tgz#b582dec693c2f11e893cf064ee6ac5b6131a2202" integrity sha1-tYLexpPC8R6JPPBk7mrFthMaIgI= -base64-js@^1.2.0, base64-js@^1.3.1: +base64-js@^1.0.2, base64-js@^1.2.0, base64-js@^1.3.1: version "1.5.1" resolved "https://registry.yarnpkg.com/base64-js/-/base64-js-1.5.1.tgz#1b1b440160a5bf7ad40b650f095963481903930a" integrity sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA== @@ -2818,6 +2833,14 @@ buffer-writer@2.0.0: resolved "https://registry.yarnpkg.com/buffer-writer/-/buffer-writer-2.0.0.tgz#ce7eb81a38f7829db09c873f2fbb792c0c98ec04" integrity sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw== +buffer@5.6.0: + version "5.6.0" + resolved "https://registry.yarnpkg.com/buffer/-/buffer-5.6.0.tgz#a31749dc7d81d84db08abf937b6b8c4033f62786" + integrity sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw== + dependencies: + base64-js "^1.0.2" + ieee754 "^1.1.4" + buffer@^5.2.0: version "5.7.1" resolved "https://registry.yarnpkg.com/buffer/-/buffer-5.7.1.tgz#ba62e7c13133053582197160851a8f648e99eed0" @@ -4228,7 +4251,7 @@ event-target-shim@^5.0.0: resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789" integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ== -events@^3.3.0: +events@3.3.0, events@^3.3.0: version "3.3.0" resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400" integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q== @@ -5006,7 +5029,7 @@ iconv-lite@0.6.3: dependencies: safer-buffer ">= 2.1.2 < 3.0.0" -ieee754@^1.1.13, ieee754@^1.2.1: +ieee754@^1.1.13, ieee754@^1.1.4, ieee754@^1.2.1: version "1.2.1" resolved "https://registry.yarnpkg.com/ieee754/-/ieee754-1.2.1.tgz#8eb7a10a63fff25d15a57b001586d177d1b0d352" integrity sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA== @@ -5066,7 +5089,7 @@ inflight@^1.0.4: once "^1.3.0" wrappy "1" -inherits@2, inherits@2.0.4, inherits@^2.0.1, inherits@^2.0.3, inherits@^2.0.4, inherits@~2.0.1, inherits@~2.0.3: +inherits@2, inherits@2.0.4, inherits@^2.0.1, inherits@^2.0.3, inherits@^2.0.4, inherits@~2.0.1, inherits@~2.0.3, inherits@~2.0.4: version "2.0.4" resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c" integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== @@ -7353,7 +7376,7 @@ readable-stream@^2.0.0, readable-stream@^2.2.2, readable-stream@~2.3.6: string_decoder "~1.1.1" util-deprecate "~1.0.1" -readable-stream@^3.0.2, readable-stream@^3.0.6, readable-stream@^3.4.0, readable-stream@^3.6.0: +readable-stream@^3.0.2, readable-stream@^3.0.6, readable-stream@^3.4.0, readable-stream@^3.5.0, readable-stream@^3.6.0: version "3.6.0" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198" integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA== @@ -8017,6 +8040,14 @@ statuses@1.5.0, "statuses@>= 1.5.0 < 2", statuses@~1.5.0: resolved "https://registry.yarnpkg.com/statuses/-/statuses-1.5.0.tgz#161c7dac177659fd9811f43771fa99381478628c" integrity sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow= +stream-browserify@3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/stream-browserify/-/stream-browserify-3.0.0.tgz#22b0a2850cdf6503e73085da1fc7b7d0c2122f2f" + integrity sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA== + dependencies: + inherits "~2.0.4" + readable-stream "^3.5.0" + stream-combiner@~0.0.4: version "0.0.4" resolved "https://registry.yarnpkg.com/stream-combiner/-/stream-combiner-0.0.4.tgz#4d5e433c185261dde623ca3f44c586bcf5c4ad14"