More robust channel sync

This commit is contained in:
Chocobozzz 2024-02-16 10:14:12 +01:00
parent 22ab711501
commit 48f1d4b186
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
5 changed files with 54 additions and 27 deletions

View File

@ -61,7 +61,7 @@ export class VideoImportService {
}
return this.authHttp
.get<ResultList<VideoImport>>(UserService.BASE_USERS_URL + '/me/videos/imports', { params })
.get<ResultList<VideoImport>>(UserService.BASE_USERS_URL + 'me/videos/imports', { params })
.pipe(
switchMap(res => this.extractVideoImports(res)),
catchError(err => this.restExtractor.handleError(err))

View File

@ -232,12 +232,23 @@ describe('Test video imports', function () {
})
it('Should search in my imports', async function () {
const { total, data: videoImports } = await servers[0].imports.getMyVideoImports({ search: 'peertube2' })
{
const { total, data } = await servers[0].imports.getMyVideoImports({ search: 'peertube2' })
expect(total).to.equal(1)
expect(videoImports).to.have.lengthOf(1)
expect(data).to.have.lengthOf(1)
expect(videoImports[0].magnetUri).to.equal(FIXTURE_URLS.magnet)
expect(videoImports[0].video.name).to.equal('super peertube2 video')
expect(data[0].magnetUri).to.equal(FIXTURE_URLS.magnet)
expect(data[0].video.name).to.equal('super peertube2 video')
}
{
const { total, data } = await servers[0].imports.getMyVideoImports({ search: FIXTURE_URLS.magnet })
expect(total).to.equal(1)
expect(data).to.have.lengthOf(1)
expect(data[0].magnetUri).to.equal(FIXTURE_URLS.magnet)
expect(data[0].video.name).to.equal('super peertube2 video')
}
})
it('Should have the video listed on the two instances', async function () {

View File

@ -26,7 +26,7 @@ export async function processVideoChannelImport (job: Job) {
channelSync = await VideoChannelSyncModel.loadWithChannel(payload.partOfChannelSyncId)
if (!channelSync) {
throw new Error('Unlnown channel sync specified in videos channel import')
throw new Error('Unknown channel sync specified in videos channel import')
}
}

View File

@ -1,4 +1,4 @@
import { logger } from '@server/helpers/logger.js'
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
import { YoutubeDLWrapper } from '@server/helpers/youtube-dl/index.js'
import { CONFIG } from '@server/initializers/config.js'
import { buildYoutubeDLImport } from '@server/lib/video-pre-import.js'
@ -9,6 +9,8 @@ import { VideoChannelSyncState, VideoPrivacy } from '@peertube/peertube-models'
import { CreateJobArgument, JobQueue } from './job-queue/index.js'
import { ServerConfigManager } from './server-config-manager.js'
const lTags = loggerTagsFactory('channel-synchronization')
export async function synchronizeChannel (options: {
channel: MChannelAccountDefault
externalChannelUrl: string
@ -36,7 +38,7 @@ export async function synchronizeChannel (options: {
logger.info(
'Fetched %d candidate URLs for sync channel %s.',
targetUrls.length, channel.Actor.preferredUsername, { targetUrls }
targetUrls.length, channel.Actor.preferredUsername, { targetUrls, ...lTags() }
)
if (targetUrls.length === 0) {
@ -51,6 +53,9 @@ export async function synchronizeChannel (options: {
const children: CreateJobArgument[] = []
for (const targetUrl of targetUrls) {
logger.debug(`Import candidate: ${targetUrl}`, lTags())
try {
if (await skipImport(channel, targetUrl, onlyAfter)) continue
const { job } = await buildYoutubeDLImport({
@ -64,6 +69,9 @@ export async function synchronizeChannel (options: {
})
children.push(job)
} catch (err) {
logger.error(`Cannot build import for ${targetUrl} in channel ${channel.name}`, { err, ...lTags() })
}
}
// Will update the channel sync status
@ -76,7 +84,7 @@ export async function synchronizeChannel (options: {
await JobQueue.Instance.createJobWithChildren(parent, children)
} catch (err) {
logger.error(`Failed to import ${externalChannelUrl} in channel ${channel.name}`, { err })
logger.error(`Failed to import ${externalChannelUrl} in channel ${channel.name}`, { err, ...lTags() })
channelSync.state = VideoChannelSyncState.FAILED
await channelSync.save()
}
@ -86,7 +94,7 @@ export async function synchronizeChannel (options: {
async function skipImport (channel: MChannel, targetUrl: string, onlyAfter?: Date) {
if (await VideoImportModel.urlAlreadyImported(channel.id, targetUrl)) {
logger.debug('%s is already imported for channel %s, skipping video channel synchronization.', targetUrl, channel.name)
logger.debug('%s is already imported for channel %s, skipping video channel synchronization.', targetUrl, channel.name, lTags())
return true
}

View File

@ -159,7 +159,7 @@ export class VideoImportModel extends Model<Partial<AttributesOnly<VideoImportMo
}) {
const { userId, start, count, sort, targetUrl, videoChannelSyncId, search } = options
const where: WhereOptions = { userId }
const where: WhereOptions = [ { userId } ]
const include: IncludeOptions[] = [
{
attributes: [ 'id' ],
@ -172,14 +172,22 @@ export class VideoImportModel extends Model<Partial<AttributesOnly<VideoImportMo
}
]
if (targetUrl) where['targetUrl'] = targetUrl
if (videoChannelSyncId) where['videoChannelSyncId'] = videoChannelSyncId
if (targetUrl) where.push({ targetUrl })
if (videoChannelSyncId) where.push({ videoChannelSyncId })
if (search) {
include.push({
model: defaultVideoScope(),
required: true,
where: searchAttribute(search, 'name')
required: false
})
where.push({
[Op.or]: [
searchAttribute(search, '$Video.name$'),
searchAttribute(search, 'targetUrl'),
searchAttribute(search, 'torrentName'),
searchAttribute(search, 'magnetUri')
]
})
} else {
include.push({