import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' import { min } from 'lodash' import { dirname } from 'path' import { Readable } from 'stream' import { CompletedPart, CompleteMultipartUploadCommand, CreateMultipartUploadCommand, CreateMultipartUploadCommandInput, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, PutObjectCommandInput, UploadPartCommand } from '@aws-sdk/client-s3' import { pipelinePromise } from '@server/helpers/core-utils' import { isArray } from '@server/helpers/custom-validators/misc' import { logger } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { getPrivateUrl } from '../urls' import { getClient } from './client' import { lTags } from './logger' type BucketInfo = { BUCKET_NAME: string PREFIX?: string } async function storeObject (options: { inputPath: string objectStorageKey: string bucketInfo: BucketInfo }): Promise { const { inputPath, objectStorageKey, bucketInfo } = options logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) const stats = await stat(inputPath) // If bigger than max allowed size we do a multipart upload if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) } const fileStream = createReadStream(inputPath) return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) } async function removeObject (filename: string, bucketInfo: BucketInfo) { const command = new DeleteObjectCommand({ Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(filename, bucketInfo) }) return getClient().send(command) } async function removePrefix (prefix: string, bucketInfo: BucketInfo) { const s3Client = getClient() const commandPrefix = bucketInfo.PREFIX + prefix const listCommand = new ListObjectsV2Command({ Bucket: bucketInfo.BUCKET_NAME, Prefix: commandPrefix }) const listedObjects = await s3Client.send(listCommand) // FIXME: use bulk delete when s3ninja will support this operation // const deleteParams = { // Bucket: bucketInfo.BUCKET_NAME, // Delete: { Objects: [] } // } if (isArray(listedObjects.Contents) !== true) { const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` logger.error(message, { response: listedObjects, ...lTags() }) throw new Error(message) } for (const object of listedObjects.Contents) { const command = new DeleteObjectCommand({ Bucket: bucketInfo.BUCKET_NAME, Key: object.Key }) await s3Client.send(command) // FIXME: use bulk delete when s3ninja will support this operation // deleteParams.Delete.Objects.push({ Key: object.Key }) } // FIXME: use bulk delete when s3ninja will support this operation // const deleteCommand = new DeleteObjectsCommand(deleteParams) // await s3Client.send(deleteCommand) // Repeat if not all objects could be listed at once (limit of 1000?) if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) } async function makeAvailable (options: { key: string destination: string bucketInfo: BucketInfo }) { const { key, destination, bucketInfo } = options await ensureDir(dirname(options.destination)) const command = new GetObjectCommand({ Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(key, bucketInfo) }) const response = await getClient().send(command) const file = createWriteStream(destination) await pipelinePromise(response.Body as Readable, file) file.close() } function buildKey (key: string, bucketInfo: BucketInfo) { return bucketInfo.PREFIX + key } // --------------------------------------------------------------------------- export { BucketInfo, buildKey, storeObject, removeObject, removePrefix, makeAvailable } // --------------------------------------------------------------------------- async function objectStoragePut (options: { objectStorageKey: string content: ReadStream bucketInfo: BucketInfo }) { const { objectStorageKey, content, bucketInfo } = options const input: PutObjectCommandInput = { Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(objectStorageKey, bucketInfo), Body: content } if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL } const command = new PutObjectCommand(input) await getClient().send(command) return getPrivateUrl(bucketInfo, objectStorageKey) } async function multiPartUpload (options: { inputPath: string objectStorageKey: string bucketInfo: BucketInfo }) { const { objectStorageKey, inputPath, bucketInfo } = options const key = buildKey(objectStorageKey, bucketInfo) const s3Client = getClient() const statResult = await stat(inputPath) const input: CreateMultipartUploadCommandInput = { Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(objectStorageKey, bucketInfo) } if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL } const createMultipartCommand = new CreateMultipartUploadCommand(input) const createResponse = await s3Client.send(createMultipartCommand) const fd = await open(inputPath, 'r') let partNumber = 1 const parts: CompletedPart[] = [] const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART for (let start = 0; start < statResult.size; start += partSize) { logger.debug( 'Uploading part %d of file to %s%s in bucket %s', partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() ) // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released // The s3 sdk needs to know the length of the http body beforehand, but doesn't support // streams with start and end set, so it just tries to stat the file in stream.path. // This fails for us because we only want to send part of the file. The stream type // is modified so we can set the byteLength here, which s3 detects because array buffers // have this field set const stream: ReadStream & { byteLength: number } = createReadStream( inputPath, { fd, autoClose: false, start, end: (start + partSize) - 1 } ) as ReadStream & { byteLength: number } // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength stream.byteLength = min([ statResult.size - start, partSize ]) const uploadPartCommand = new UploadPartCommand({ Bucket: bucketInfo.BUCKET_NAME, Key: key, UploadId: createResponse.UploadId, PartNumber: partNumber, Body: stream }) const uploadResponse = await s3Client.send(uploadPartCommand) parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) partNumber += 1 } await close(fd) const completeUploadCommand = new CompleteMultipartUploadCommand({ Bucket: bucketInfo.BUCKET_NAME, Key: key, UploadId: createResponse.UploadId, MultipartUpload: { Parts: parts } }) await s3Client.send(completeUploadCommand) logger.debug( 'Completed %s%s in bucket %s in %d parts', bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() ) return getPrivateUrl(bucketInfo, objectStorageKey) }