refactor(server): redis > ioredis (#5371)
* refactor(server): redis > ioredis
* refactor(JobQueue): reuse redis connection builder
* fix(redisio)
* fix(redis): setValue
* feat(redis): showFriendlyErrorStack
* feat(redis): auto pipelining
308017a6b9/README.md (autopipelining)
* dont use autopipelining for bullmq
* ioredis events
This commit is contained in:
parent
ff91b644fb
commit
564b9b5597
|
@ -129,6 +129,7 @@
|
|||
"helmet": "^6.0.0",
|
||||
"hpagent": "^1.0.0",
|
||||
"http-problem-details": "^0.1.5",
|
||||
"ioredis": "^5.2.3",
|
||||
"ip-anonymize": "^0.1.0",
|
||||
"ipaddr.js": "2.0.1",
|
||||
"is-cidr": "^4.0.0",
|
||||
|
@ -157,7 +158,6 @@
|
|||
"prompt": "^1.0.0",
|
||||
"proxy-addr": "^2.0.7",
|
||||
"pug": "^3.0.0",
|
||||
"redis": "^4.0.1",
|
||||
"reflect-metadata": "^0.1.12",
|
||||
"sanitize-html": "2.x",
|
||||
"sequelize": "6.21.6",
|
||||
|
|
|
@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
|
|||
import { processVideoStudioEdition } from './handlers/video-studio-edition'
|
||||
import { processVideoTranscoding } from './handlers/video-transcoding'
|
||||
import { processVideosViewsStats } from './handlers/video-views-stats'
|
||||
import { Redis } from '../redis'
|
||||
|
||||
export type CreateJobArgument =
|
||||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
|
||||
|
@ -183,7 +184,7 @@ class JobQueue {
|
|||
}
|
||||
|
||||
this.flowProducer = new FlowProducer({
|
||||
connection: this.getRedisConnection(),
|
||||
connection: Redis.getRedisClientOptions('FlowProducer'),
|
||||
prefix: this.jobRedisPrefix
|
||||
})
|
||||
this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
|
||||
|
@ -196,7 +197,7 @@ class JobQueue {
|
|||
autorun: false,
|
||||
concurrency: this.getJobConcurrency(handlerName),
|
||||
prefix: this.jobRedisPrefix,
|
||||
connection: this.getRedisConnection()
|
||||
connection: Redis.getRedisClientOptions('Worker')
|
||||
}
|
||||
|
||||
const handler = function (job: Job) {
|
||||
|
@ -236,7 +237,7 @@ class JobQueue {
|
|||
|
||||
private buildQueue (handlerName: JobType) {
|
||||
const queueOptions: QueueOptions = {
|
||||
connection: this.getRedisConnection(),
|
||||
connection: Redis.getRedisClientOptions('Queue'),
|
||||
prefix: this.jobRedisPrefix
|
||||
}
|
||||
|
||||
|
@ -249,7 +250,7 @@ class JobQueue {
|
|||
private buildQueueScheduler (handlerName: JobType) {
|
||||
const queueSchedulerOptions: QueueSchedulerOptions = {
|
||||
autorun: false,
|
||||
connection: this.getRedisConnection(),
|
||||
connection: Redis.getRedisClientOptions('QueueScheduler'),
|
||||
prefix: this.jobRedisPrefix,
|
||||
maxStalledCount: 10
|
||||
}
|
||||
|
@ -263,7 +264,7 @@ class JobQueue {
|
|||
private buildQueueEvent (handlerName: JobType) {
|
||||
const queueEventsOptions: QueueEventsOptions = {
|
||||
autorun: false,
|
||||
connection: this.getRedisConnection(),
|
||||
connection: Redis.getRedisClientOptions('QueueEvent'),
|
||||
prefix: this.jobRedisPrefix
|
||||
}
|
||||
|
||||
|
@ -273,16 +274,6 @@ class JobQueue {
|
|||
this.queueEvents[handlerName] = queueEvents
|
||||
}
|
||||
|
||||
private getRedisConnection () {
|
||||
return {
|
||||
password: CONFIG.REDIS.AUTH,
|
||||
db: CONFIG.REDIS.DB,
|
||||
host: CONFIG.REDIS.HOSTNAME,
|
||||
port: CONFIG.REDIS.PORT,
|
||||
path: CONFIG.REDIS.SOCKET
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async terminate () {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { createClient, RedisClientOptions, RedisModules } from 'redis'
|
||||
import IoRedis, { RedisOptions } from 'ioredis'
|
||||
import { exists } from '@server/helpers/custom-validators/misc'
|
||||
import { sha256 } from '@shared/extra-utils'
|
||||
import { logger } from '../helpers/logger'
|
||||
|
@ -22,7 +22,7 @@ class Redis {
|
|||
private static instance: Redis
|
||||
private initialized = false
|
||||
private connected = false
|
||||
private client: ReturnType<typeof createClient>
|
||||
private client: IoRedis
|
||||
private prefix: string
|
||||
|
||||
private constructor () {
|
||||
|
@ -33,48 +33,44 @@ class Redis {
|
|||
if (this.initialized === true) return
|
||||
this.initialized = true
|
||||
|
||||
this.client = createClient(Redis.getRedisClientOptions())
|
||||
this.client.on('error', err => logger.error('Redis Client Error', { err }))
|
||||
|
||||
logger.info('Connecting to redis...')
|
||||
|
||||
this.client.connect()
|
||||
.then(() => {
|
||||
this.client = new IoRedis(Redis.getRedisClientOptions('', { enableAutoPipelining: true }))
|
||||
this.client.on('error', err => logger.error('Redis failed to connect', { err }))
|
||||
this.client.on('connect', () => {
|
||||
logger.info('Connected to redis.')
|
||||
|
||||
this.connected = true
|
||||
}).catch(err => {
|
||||
logger.error('Cannot connect to redis', { err })
|
||||
process.exit(-1)
|
||||
})
|
||||
this.client.on('reconnecting', (ms) => {
|
||||
logger.error(`Reconnecting to redis in ${ms}.`)
|
||||
})
|
||||
this.client.on('close', () => {
|
||||
logger.error('Connection to redis has closed.')
|
||||
this.connected = false
|
||||
})
|
||||
|
||||
this.client.on('end', () => {
|
||||
logger.error('Connection to redis has closed and no more reconnects will be done.')
|
||||
})
|
||||
|
||||
this.prefix = 'redis-' + WEBSERVER.HOST + '-'
|
||||
}
|
||||
|
||||
static getRedisClientOptions () {
|
||||
let config: RedisClientOptions<RedisModules, {}> = {
|
||||
socket: {
|
||||
connectTimeout: 20000 // Could be slow since node use sync call to compile PeerTube
|
||||
static getRedisClientOptions (connectionName?: string, options: RedisOptions = {}): RedisOptions {
|
||||
return {
|
||||
connectionName: [ 'PeerTube', connectionName ].join(''),
|
||||
connectTimeout: 20000, // Could be slow since node use sync call to compile PeerTube
|
||||
password: CONFIG.REDIS.AUTH,
|
||||
db: CONFIG.REDIS.DB,
|
||||
host: CONFIG.REDIS.HOSTNAME,
|
||||
port: CONFIG.REDIS.PORT,
|
||||
path: CONFIG.REDIS.SOCKET,
|
||||
showFriendlyErrorStack: true,
|
||||
...options
|
||||
}
|
||||
}
|
||||
|
||||
if (CONFIG.REDIS.AUTH) {
|
||||
config = { ...config, password: CONFIG.REDIS.AUTH }
|
||||
}
|
||||
|
||||
if (CONFIG.REDIS.DB) {
|
||||
config = { ...config, database: CONFIG.REDIS.DB }
|
||||
}
|
||||
|
||||
if (CONFIG.REDIS.HOSTNAME && CONFIG.REDIS.PORT) {
|
||||
config.socket = { ...config.socket, host: CONFIG.REDIS.HOSTNAME, port: CONFIG.REDIS.PORT }
|
||||
} else {
|
||||
config.socket = { ...config.socket, path: CONFIG.REDIS.SOCKET }
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
getClient () {
|
||||
return this.client
|
||||
}
|
||||
|
@ -388,15 +384,15 @@ class Redis {
|
|||
}
|
||||
|
||||
private getSet (key: string) {
|
||||
return this.client.sMembers(this.prefix + key)
|
||||
return this.client.smembers(this.prefix + key)
|
||||
}
|
||||
|
||||
private addToSet (key: string, value: string) {
|
||||
return this.client.sAdd(this.prefix + key, value)
|
||||
return this.client.sadd(this.prefix + key, value)
|
||||
}
|
||||
|
||||
private deleteFromSet (key: string, value: string) {
|
||||
return this.client.sRem(this.prefix + key, value)
|
||||
return this.client.srem(this.prefix + key, value)
|
||||
}
|
||||
|
||||
private deleteKey (key: string) {
|
||||
|
@ -415,11 +411,13 @@ class Redis {
|
|||
}
|
||||
|
||||
private async setValue (key: string, value: string, expirationMilliseconds?: number) {
|
||||
const options = expirationMilliseconds
|
||||
? { PX: expirationMilliseconds }
|
||||
: {}
|
||||
let result
|
||||
|
||||
const result = await this.client.set(this.prefix + key, value, options)
|
||||
if (expirationMilliseconds !== undefined) {
|
||||
result = await this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds)
|
||||
} else {
|
||||
result = await this.client.set(this.prefix + key, value)
|
||||
}
|
||||
|
||||
if (result !== 'OK') throw new Error('Redis set result is not OK.')
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ export class ApiCache {
|
|||
if (!Redis.Instance.isConnected()) return this.makeResponseCacheable(res, next, key, duration)
|
||||
|
||||
try {
|
||||
const obj = await redis.hGetAll(key)
|
||||
const obj = await redis.hgetall(key)
|
||||
if (obj?.response) {
|
||||
return this.sendCachedResponse(req, res, JSON.parse(obj.response), duration)
|
||||
}
|
||||
|
@ -100,8 +100,8 @@ export class ApiCache {
|
|||
|
||||
if (Redis.Instance.isConnected()) {
|
||||
await Promise.all([
|
||||
redis.hSet(key, 'response', JSON.stringify(value)),
|
||||
redis.hSet(key, 'duration', duration + ''),
|
||||
redis.hset(key, 'response', JSON.stringify(value)),
|
||||
redis.hset(key, 'duration', duration + ''),
|
||||
redis.expire(key, duration / 1000)
|
||||
])
|
||||
}
|
||||
|
|
65
yarn.lock
65
yarn.lock
|
@ -1869,40 +1869,6 @@
|
|||
smtp-server "^3.9.0"
|
||||
wildstring "1.0.9"
|
||||
|
||||
"@redis/bloom@1.0.2":
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/@redis/bloom/-/bloom-1.0.2.tgz#42b82ec399a92db05e29fffcdfd9235a5fc15cdf"
|
||||
integrity sha512-EBw7Ag1hPgFzdznK2PBblc1kdlj5B5Cw3XwI9/oG7tSn85/HKy3X9xHy/8tm/eNXJYHLXHJL/pkwBpFMVVefkw==
|
||||
|
||||
"@redis/client@1.3.0":
|
||||
version "1.3.0"
|
||||
resolved "https://registry.yarnpkg.com/@redis/client/-/client-1.3.0.tgz#c62ccd707f16370a2dc2f9e158a28b7da049fa77"
|
||||
integrity sha512-XCFV60nloXAefDsPnYMjHGtvbtHR8fV5Om8cQ0JYqTNbWcQo/4AryzJ2luRj4blveWazRK/j40gES8M7Cp6cfQ==
|
||||
dependencies:
|
||||
cluster-key-slot "1.1.0"
|
||||
generic-pool "3.8.2"
|
||||
yallist "4.0.0"
|
||||
|
||||
"@redis/graph@1.0.1":
|
||||
version "1.0.1"
|
||||
resolved "https://registry.yarnpkg.com/@redis/graph/-/graph-1.0.1.tgz#eabc58ba99cd70d0c907169c02b55497e4ec8a99"
|
||||
integrity sha512-oDE4myMCJOCVKYMygEMWuriBgqlS5FqdWerikMoJxzmmTUErnTRRgmIDa2VcgytACZMFqpAOWDzops4DOlnkfQ==
|
||||
|
||||
"@redis/json@1.0.4":
|
||||
version "1.0.4"
|
||||
resolved "https://registry.yarnpkg.com/@redis/json/-/json-1.0.4.tgz#f372b5f93324e6ffb7f16aadcbcb4e5c3d39bda1"
|
||||
integrity sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==
|
||||
|
||||
"@redis/search@1.1.0":
|
||||
version "1.1.0"
|
||||
resolved "https://registry.yarnpkg.com/@redis/search/-/search-1.1.0.tgz#7abb18d431f27ceafe6bcb4dd83a3fa67e9ab4df"
|
||||
integrity sha512-NyFZEVnxIJEybpy+YskjgOJRNsfTYqaPbK/Buv6W2kmFNaRk85JiqjJZA5QkRmWvGbyQYwoO5QfDi2wHskKrQQ==
|
||||
|
||||
"@redis/time-series@1.0.3":
|
||||
version "1.0.3"
|
||||
resolved "https://registry.yarnpkg.com/@redis/time-series/-/time-series-1.0.3.tgz#4cfca8e564228c0bddcdf4418cba60c20b224ac4"
|
||||
integrity sha512-OFp0q4SGrTH0Mruf6oFsHGea58u8vS/iI5+NpYdicaM+7BgqBZH8FFvNZ8rYYLrUO/QRqMq72NpXmxLVNcdmjA==
|
||||
|
||||
"@selderee/plugin-htmlparser2@^0.6.0":
|
||||
version "0.6.0"
|
||||
resolved "https://registry.yarnpkg.com/@selderee/plugin-htmlparser2/-/plugin-htmlparser2-0.6.0.tgz#27e994afd1c2cb647ceb5406a185a5574188069d"
|
||||
|
@ -3460,7 +3426,7 @@ clone@^2.0.0:
|
|||
resolved "https://registry.yarnpkg.com/clone/-/clone-2.1.2.tgz#1b7f4b9f591f1e8f83670401600345a02887435f"
|
||||
integrity sha512-3Pe/CF1Nn94hyhIYpjtiLhdCoEoz0DqQ+988E9gmeEdQZlojxnOb74wctFyuwWQHzqyf9X7C7MG8juUpqBJT8w==
|
||||
|
||||
cluster-key-slot@1.1.0, cluster-key-slot@^1.1.0:
|
||||
cluster-key-slot@^1.1.0:
|
||||
version "1.1.0"
|
||||
resolved "https://registry.yarnpkg.com/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz#30474b2a981fb12172695833052bc0d01336d10d"
|
||||
integrity sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw==
|
||||
|
@ -5021,11 +4987,6 @@ gauge@^3.0.0:
|
|||
strip-ansi "^6.0.1"
|
||||
wide-align "^1.1.2"
|
||||
|
||||
generic-pool@3.8.2:
|
||||
version "3.8.2"
|
||||
resolved "https://registry.yarnpkg.com/generic-pool/-/generic-pool-3.8.2.tgz#aab4f280adb522fdfbdc5e5b64d718d3683f04e9"
|
||||
integrity sha512-nGToKy6p3PAbYQ7p1UlWl6vSPwfwU6TMSWK7TTu+WUY4ZjyZQGniGGt2oNVvyNSpyZYSB43zMXVLcBm08MTMkg==
|
||||
|
||||
get-browser-rtc@^1.1.0:
|
||||
version "1.1.0"
|
||||
resolved "https://registry.yarnpkg.com/get-browser-rtc/-/get-browser-rtc-1.1.0.tgz#d1494e299b00f33fc8e9d6d3343ba4ba99711a2c"
|
||||
|
@ -5553,7 +5514,7 @@ invariant@2.2.4:
|
|||
dependencies:
|
||||
loose-envify "^1.0.0"
|
||||
|
||||
ioredis@^5.2.2:
|
||||
ioredis@^5.2.2, ioredis@^5.2.3:
|
||||
version "5.2.3"
|
||||
resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.2.3.tgz#d5b37eb13e643241660d6cee4eeb41a026cda8c0"
|
||||
integrity sha512-gQNcMF23/NpvjCaa1b5YycUyQJ9rBNH2xP94LWinNpodMWVUPP5Ai/xXANn/SM7gfIvI62B5CCvZxhg5pOgyMw==
|
||||
|
@ -7865,18 +7826,6 @@ redis-parser@^3.0.0:
|
|||
dependencies:
|
||||
redis-errors "^1.0.0"
|
||||
|
||||
redis@^4.0.1:
|
||||
version "4.3.1"
|
||||
resolved "https://registry.yarnpkg.com/redis/-/redis-4.3.1.tgz#290532a0c22221e05e991162ac4dca1e1b2ff6da"
|
||||
integrity sha512-cM7yFU5CA6zyCF7N/+SSTcSJQSRMEKN0k0Whhu6J7n9mmXRoXugfWDBo5iOzGwABmsWKSwGPTU5J4Bxbl+0mrA==
|
||||
dependencies:
|
||||
"@redis/bloom" "1.0.2"
|
||||
"@redis/client" "1.3.0"
|
||||
"@redis/graph" "1.0.1"
|
||||
"@redis/json" "1.0.4"
|
||||
"@redis/search" "1.1.0"
|
||||
"@redis/time-series" "1.0.3"
|
||||
|
||||
reflect-metadata@^0.1.12:
|
||||
version "0.1.13"
|
||||
resolved "https://registry.yarnpkg.com/reflect-metadata/-/reflect-metadata-0.1.13.tgz#67ae3ca57c972a2aa1642b10fe363fe32d49dc08"
|
||||
|
@ -9574,16 +9523,16 @@ y18n@^5.0.5:
|
|||
resolved "https://registry.yarnpkg.com/y18n/-/y18n-5.0.8.tgz#7f4934d0f7ca8c56f95314939ddcd2dd91ce1d55"
|
||||
integrity sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==
|
||||
|
||||
yallist@4.0.0, yallist@^4.0.0:
|
||||
version "4.0.0"
|
||||
resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"
|
||||
integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==
|
||||
|
||||
yallist@^2.1.2:
|
||||
version "2.1.2"
|
||||
resolved "https://registry.yarnpkg.com/yallist/-/yallist-2.1.2.tgz#1c11f9218f076089a47dd512f93c6699a6a81d52"
|
||||
integrity sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A==
|
||||
|
||||
yallist@^4.0.0:
|
||||
version "4.0.0"
|
||||
resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"
|
||||
integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==
|
||||
|
||||
yaml@^1.10.0:
|
||||
version "1.10.2"
|
||||
resolved "https://registry.yarnpkg.com/yaml/-/yaml-1.10.2.tgz#2301c5ffbf12b467de8da2333a459e29e7920e4b"
|
||||
|
|
Loading…
Reference in New Issue