More robust quota check
Avoid concurrency issues with permanent lives
This commit is contained in:
parent
fa3da7a623
commit
3f0ceab06e
|
@ -313,7 +313,7 @@ class LiveManager {
|
|||
const liveSession = await this.saveStartingSession(videoLive)
|
||||
|
||||
const user = await UserModel.loadByLiveId(videoLive.id)
|
||||
LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
|
||||
LiveQuotaStore.Instance.addNewLive(user.id, sessionId)
|
||||
|
||||
const muxingSession = new MuxingSession({
|
||||
context: this.getContext(),
|
||||
|
@ -359,7 +359,7 @@ class LiveManager {
|
|||
muxingSession.on('after-cleanup', ({ videoUUID }) => {
|
||||
this.muxingSessions.delete(sessionId)
|
||||
|
||||
LiveQuotaStore.Instance.removeLive(user.id, videoLive.id)
|
||||
LiveQuotaStore.Instance.removeLive(user.id, sessionId)
|
||||
|
||||
muxingSession.destroy()
|
||||
|
||||
|
|
|
@ -2,31 +2,31 @@ class LiveQuotaStore {
|
|||
|
||||
private static instance: LiveQuotaStore
|
||||
|
||||
private readonly livesPerUser = new Map<number, { liveId: number, size: number }[]>()
|
||||
private readonly livesPerUser = new Map<number, { sessionId: string, size: number }[]>()
|
||||
|
||||
private constructor () {
|
||||
}
|
||||
|
||||
addNewLive (userId: number, liveId: number) {
|
||||
addNewLive (userId: number, sessionId: string) {
|
||||
if (!this.livesPerUser.has(userId)) {
|
||||
this.livesPerUser.set(userId, [])
|
||||
}
|
||||
|
||||
const currentUserLive = { liveId, size: 0 }
|
||||
const currentUserLive = { sessionId, size: 0 }
|
||||
const livesOfUser = this.livesPerUser.get(userId)
|
||||
livesOfUser.push(currentUserLive)
|
||||
}
|
||||
|
||||
removeLive (userId: number, liveId: number) {
|
||||
removeLive (userId: number, sessionId: string) {
|
||||
const newLivesPerUser = this.livesPerUser.get(userId)
|
||||
.filter(o => o.liveId !== liveId)
|
||||
.filter(o => o.sessionId !== sessionId)
|
||||
|
||||
this.livesPerUser.set(userId, newLivesPerUser)
|
||||
}
|
||||
|
||||
addQuotaTo (userId: number, liveId: number, size: number) {
|
||||
addQuotaTo (userId: number, sessionId: string, size: number) {
|
||||
const lives = this.livesPerUser.get(userId)
|
||||
const live = lives.find(l => l.liveId === liveId)
|
||||
const live = lives.find(l => l.sessionId === sessionId)
|
||||
|
||||
live.size += size
|
||||
}
|
||||
|
|
|
@ -271,7 +271,7 @@ class MuxingSession extends EventEmitter {
|
|||
try {
|
||||
const segmentStat = await stat(segmentPath)
|
||||
|
||||
LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.videoLive.id, segmentStat.size)
|
||||
LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.sessionId, segmentStat.size)
|
||||
|
||||
const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id)
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import { expect } from 'chai'
|
||||
import { wait } from '@shared/core-utils'
|
||||
import { LiveVideoError, VideoPrivacy } from '@shared/models'
|
||||
import { LiveVideoError, UserVideoQuota, VideoPrivacy } from '@shared/models'
|
||||
import {
|
||||
cleanupTests,
|
||||
ConfigCommand,
|
||||
|
@ -172,12 +172,18 @@ describe('Test live constraints', function () {
|
|||
const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ token: userAccessToken, videoId: userVideoLiveoId })
|
||||
|
||||
await servers[0].live.waitUntilPublished({ videoId: userVideoLiveoId })
|
||||
// Wait previous live cleanups
|
||||
await wait(3000)
|
||||
|
||||
const baseQuota = await servers[0].users.getMyQuotaUsed({ token: userAccessToken })
|
||||
|
||||
await wait(3000)
|
||||
let quotaUser: UserVideoQuota
|
||||
|
||||
const quotaUser = await servers[0].users.getMyQuotaUsed({ token: userAccessToken })
|
||||
do {
|
||||
await wait(500)
|
||||
|
||||
quotaUser = await servers[0].users.getMyQuotaUsed({ token: userAccessToken })
|
||||
} while (quotaUser.videoQuotaUsed < baseQuota.videoQuotaUsed)
|
||||
|
||||
const { data } = await servers[0].users.list()
|
||||
const quotaAdmin = data.find(u => u.username === 'user1')
|
||||
|
|
Loading…
Reference in New Issue