diff --git a/streaming/constants.mjs b/streaming/constants.mjs new file mode 100644 index 0000000000..6a955b8862 --- /dev/null +++ b/streaming/constants.mjs @@ -0,0 +1,27 @@ +// @ts-check + +import * as os from 'node:os'; + +/** @type {boolean} */ +export const alwaysRequireAuth = + process.env.LIMITED_FEDERATION_MODE === 'true' || + process.env.WHITELIST_MODE === 'true' || + process.env.AUTHORIZED_FETCH === 'true'; + +/** @type {string} */ +export const env = process.env.NODE_ENV || 'development'; + +/** @type {string} */ +export const logLevel = process.env.LOG_LEVEL || 'verbose'; + +/** @type {number} */ +export const numWorkers = + +process.env.STREAMING_CLUSTER_NUM || + (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); + +/** @type {string} */ +export const redisNamespace = process.env.REDIS_NAMESPACE || null; + +export const trustedProxyIp = process.env.TRUSTED_PROXY_IP + ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) + : 'loopback,uniquelocal'; diff --git a/streaming/index.js b/streaming/index.mjs similarity index 66% rename from streaming/index.js rename to streaming/index.mjs index 6935c47645..597ae333eb 100644 --- a/streaming/index.js +++ b/streaming/index.mjs @@ -1,112 +1,51 @@ // @ts-check -const os = require('os'); -const throng = require('throng'); -const dotenv = require('dotenv'); -const express = require('express'); -const http = require('http'); -const redis = require('redis'); -const pg = require('pg'); -const log = require('npmlog'); -const url = require('url'); -const uuid = require('uuid'); -const fs = require('fs'); -const WebSocket = require('ws'); +import * as http from 'node:http'; +import * as url from 'node:url'; -const env = process.env.NODE_ENV || 'development'; -const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true'; +import * as dotenv from 'dotenv'; +import express from 'express'; +import log from 'npmlog'; +import * as pg from 'pg'; +import throng from 'throng'; +import * as uuid from 'uuid'; +import WebSocket from 'ws'; + +import { + alwaysRequireAuth, + env, + logLevel, + numWorkers, + redisNamespace, + trustedProxyIp, +} from './constants.mjs'; +import { + allowCrossDomain, + authenticationMiddleware, + errorMiddleware, + setRemoteAddress, + setRequestId, +} from './middlewares.mjs'; +import Subscriber from './subscriber.mjs'; +import { + attachServerWithConfig, + checkScopes, + dbUrlToConfig, + firstParam, + httpNotFound, + isInScope, + isTruthy, + onPortAvailable, + parseJSON, + placeholders, + redisUrlToClient, +} from './utils.mjs'; dotenv.config({ path: env === 'production' ? '.env.production' : '.env', }); -log.level = process.env.LOG_LEVEL || 'verbose'; - -/** - * @param {string} dbUrl - * @return {Object.} - */ -const dbUrlToConfig = (dbUrl) => { - if (!dbUrl) { - return {}; - } - - const params = url.parse(dbUrl, true); - const config = {}; - - if (params.auth) { - [config.user, config.password] = params.auth.split(':'); - } - - if (params.hostname) { - config.host = params.hostname; - } - - if (params.port) { - config.port = params.port; - } - - if (params.pathname) { - config.database = params.pathname.split('/')[1]; - } - - const ssl = params.query && params.query.ssl; - - if (ssl && ssl === 'true' || ssl === '1') { - config.ssl = true; - } - - return config; -}; - -/** - * @param {Object.} defaultConfig - * @param {string} redisUrl - */ -const redisUrlToClient = async (defaultConfig, redisUrl) => { - const config = defaultConfig; - - let client; - - if (!redisUrl) { - client = redis.createClient(config); - } else if (redisUrl.startsWith('unix://')) { - client = redis.createClient(Object.assign(config, { - socket: { - path: redisUrl.slice(7), - }, - })); - } else { - client = redis.createClient(Object.assign(config, { - url: redisUrl, - })); - } - - client.on('error', (err) => log.error('Redis Client Error!', err)); - await client.connect(); - - return client; -}; - -const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); - -/** - * @param {string} json - * @param {any} req - * @return {Object.|null} - */ -const parseJSON = (json, req) => { - try { - return JSON.parse(json); - } catch (err) { - if (req.accountId) { - log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`); - } else { - log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`); - } - return null; - } -}; +log.level = logLevel; const startMaster = () => { if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) { @@ -119,13 +58,14 @@ const startMaster = () => { const startWorker = async (workerId) => { log.warn(`Starting worker ${workerId}`); + /** @type {Record} */ const pgConfigs = { development: { user: process.env.DB_USER || pg.defaults.user, password: process.env.DB_PASS || pg.defaults.password, database: process.env.DB_NAME || 'mastodon_development', host: process.env.DB_HOST || pg.defaults.host, - port: process.env.DB_PORT || pg.defaults.port, + port: process.env.DB_PORT ? +process.env.DB_PORT : pg.defaults.port, max: 10, }, @@ -134,7 +74,7 @@ const startWorker = async (workerId) => { password: process.env.DB_PASS || '', database: process.env.DB_NAME || 'mastodon_production', host: process.env.DB_HOST || 'localhost', - port: process.env.DB_PORT || 5432, + port: process.env.DB_PORT ? +process.env.DB_PORT : 5432, max: 10, }, }; @@ -146,34 +86,31 @@ const startWorker = async (workerId) => { const app = express(); - app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal'); + app.set('trust proxy', trustedProxyIp); - const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL))); + const pgPool = new pg.Pool({ + ...pgConfigs[env], + ...dbUrlToConfig(process.env.DATABASE_URL), + }); const server = http.createServer(app); - const redisNamespace = process.env.REDIS_NAMESPACE || null; + /** @type {import('redis').RedisClientOptions} */ const redisParams = { socket: { host: process.env.REDIS_HOST || '127.0.0.1', - port: process.env.REDIS_PORT || 6379, + port: process.env.REDIS_PORT ? +process.env.REDIS_PORT : 6379, }, - database: process.env.REDIS_DB || 0, + database: process.env.REDIS_DB ? +process.env.REDIS_DB : 0, password: process.env.REDIS_PASSWORD || undefined, }; - if (redisNamespace) { - redisParams.namespace = redisNamespace; - } - const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; - /** - * @type {Object.>} - */ - const subs = {}; - const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); + const subscriber = new Subscriber({ + redisClient: redisSubscribeClient, + }); /** * @param {string[]} channels @@ -195,121 +132,6 @@ const startWorker = async (workerId) => { }; }; - /** - * @param {string} message - * @param {string} channel - */ - const onRedisMessage = (message, channel) => { - const callbacks = subs[channel]; - - log.silly(`New message on channel ${channel}`); - - if (!callbacks) { - return; - } - - callbacks.forEach(callback => callback(message)); - }; - - /** - * @param {string} channel - * @param {function(string): void} callback - */ - const subscribe = (channel, callback) => { - log.silly(`Adding listener for ${channel}`); - - subs[channel] = subs[channel] || []; - - if (subs[channel].length === 0) { - log.verbose(`Subscribe ${channel}`); - redisSubscribeClient.subscribe(channel, onRedisMessage); - } - - subs[channel].push(callback); - }; - - /** - * @param {string} channel - */ - const unsubscribe = (channel, callback) => { - log.silly(`Removing listener for ${channel}`); - - if (!subs[channel]) { - return; - } - - subs[channel] = subs[channel].filter(item => item !== callback); - - if (subs[channel].length === 0) { - log.verbose(`Unsubscribe ${channel}`); - redisSubscribeClient.unsubscribe(channel); - delete subs[channel]; - } - }; - - const FALSE_VALUES = [ - false, - 0, - '0', - 'f', - 'F', - 'false', - 'FALSE', - 'off', - 'OFF', - ]; - - /** - * @param {any} value - * @return {boolean} - */ - const isTruthy = value => - value && !FALSE_VALUES.includes(value); - - /** - * @param {any} req - * @param {any} res - * @param {function(Error=): void} - */ - const allowCrossDomain = (req, res, next) => { - res.header('Access-Control-Allow-Origin', '*'); - res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control'); - res.header('Access-Control-Allow-Methods', 'GET, OPTIONS'); - - next(); - }; - - /** - * @param {any} req - * @param {any} res - * @param {function(Error=): void} - */ - const setRequestId = (req, res, next) => { - req.requestId = uuid.v4(); - res.header('X-Request-Id', req.requestId); - - next(); - }; - - /** - * @param {any} req - * @param {any} res - * @param {function(Error=): void} - */ - const setRemoteAddress = (req, res, next) => { - req.remoteAddress = req.connection.remoteAddress; - - next(); - }; - - /** - * @param {any} req - * @param {string[]} necessaryScopes - * @return {boolean} - */ - const isInScope = (req, necessaryScopes) => - req.scopes.some(scope => necessaryScopes.includes(scope)); - /** * @param {string} token * @param {any} req @@ -409,58 +231,6 @@ const startWorker = async (workerId) => { } }; - const PUBLIC_CHANNELS = [ - 'public', - 'public:media', - 'public:local', - 'public:local:media', - 'public:remote', - 'public:remote:media', - 'hashtag', - 'hashtag:local', - ]; - - /** - * @param {any} req - * @param {string} channelName - * @return {Promise.} - */ - const checkScopes = (req, channelName) => new Promise((resolve, reject) => { - log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`); - - // When accessing public channels, no scopes are needed - if (PUBLIC_CHANNELS.includes(channelName)) { - resolve(); - return; - } - - // The `read` scope has the highest priority, if the token has it - // then it can access all streams - const requiredScopes = ['read']; - - // When accessing specifically the notifications stream, - // we need a read:notifications, while in all other cases, - // we can allow access with read:statuses. Mind that the - // user stream will not contain notifications unless - // the token has either read or read:notifications scope - // as well, this is handled separately. - if (channelName === 'user:notification') { - requiredScopes.push('read:notifications'); - } else { - requiredScopes.push('read:statuses'); - } - - if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) { - resolve(); - return; - } - - const err = new Error('Access token does not cover required scopes'); - err.status = 401; - - reject(err); - }); - /** * @param {any} info * @param {function(boolean, number, string): void} callback @@ -507,73 +277,6 @@ const startWorker = async (workerId) => { }; }; - /** - * @param {any} req - * @param {any} res - */ - const subscribeHttpToSystemChannel = (req, res) => { - const systemChannelId = `timeline:access_token:${req.accessTokenId}`; - - const listener = createSystemMessageListener(req, { - - onKill() { - res.end(); - }, - - }); - - res.on('close', () => { - unsubscribe(`${redisPrefix}${systemChannelId}`, listener); - }); - - subscribe(`${redisPrefix}${systemChannelId}`, listener); - }; - - /** - * @param {any} req - * @param {any} res - * @param {function(Error=): void} next - */ - const authenticationMiddleware = (req, res, next) => { - if (req.method === 'OPTIONS') { - next(); - return; - } - - accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => { - subscribeHttpToSystemChannel(req, res); - }).then(() => { - next(); - }).catch(err => { - next(err); - }); - }; - - /** - * @param {Error} err - * @param {any} req - * @param {any} res - * @param {function(Error=): void} next - */ - const errorMiddleware = (err, req, res, next) => { - log.error(req.requestId, err.toString()); - - if (res.headersSent) { - next(err); - return; - } - - res.writeHead(err.status || 500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' })); - }; - - /** - * @param {array} arr - * @param {number=} shift - * @return {string} - */ - const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); - /** * @param {string} listId * @param {any} req @@ -690,7 +393,7 @@ const startWorker = async (workerId) => { }; ids.forEach(id => { - subscribe(`${redisPrefix}${id}`, listener); + subscriber.register(`${redisPrefix}${id}`, listener); }); if (attachCloseHandler) { @@ -735,7 +438,7 @@ const startWorker = async (workerId) => { const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id); + subscriber.unregister(id); }); if (closeHandler) { @@ -759,14 +462,6 @@ const startWorker = async (workerId) => { ws.send(JSON.stringify({ stream: streamName, event, payload })); }; - /** - * @param {any} res - */ - const httpNotFound = res => { - res.writeHead(404, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Not found' })); - }; - app.use(setRequestId); app.use(setRemoteAddress); app.use(allowCrossDomain); @@ -780,15 +475,16 @@ const startWorker = async (workerId) => { app.use(errorMiddleware); app.get('/api/v1/streaming/*', (req, res) => { - channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { - const onSend = streamToHttp(req, res); - const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); + channelNameToIds(req, channelNameFromPath(req), req.query) + .then(({ channelIds, options }) => { + const onSend = streamToHttp(req, res); + const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); - streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering); - }).catch(err => { - log.verbose(req.requestId, 'Subscription error:', err.toString()); - httpNotFound(res); - }); + streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering); + }).catch(err => { + log.verbose(req.requestId, 'Subscription error:', err.toString()); + httpNotFound(res); + }); }); const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient }); @@ -994,7 +690,7 @@ const startWorker = async (workerId) => { const { listener, stopHeartbeat } = subscription; channelIds.forEach(channelId => { - unsubscribe(`${redisPrefix}${channelId}`, listener); + subscriber.unregister(`${redisPrefix}${channelId}`, listener); }); stopHeartbeat(); @@ -1019,7 +715,7 @@ const startWorker = async (workerId) => { }); - subscribe(`${redisPrefix}${systemChannelId}`, listener); + subscriber.register(`${redisPrefix}${systemChannelId}`, listener); subscriptions[systemChannelId] = { listener, @@ -1028,18 +724,6 @@ const startWorker = async (workerId) => { }; }; - /** - * @param {string|string[]} arrayOrString - * @return {string} - */ - const firstParam = arrayOrString => { - if (Array.isArray(arrayOrString)) { - return arrayOrString[0]; - } else { - return arrayOrString; - } - }; - wss.on('connection', (ws, req) => { const location = url.parse(req.url, true); @@ -1068,7 +752,7 @@ const startWorker = async (workerId) => { const { listener, stopHeartbeat } = session.subscriptions[channelIds]; channelIds.split(';').forEach(channelId => { - unsubscribe(`${redisPrefix}${channelId}`, listener); + subscriber.unregister(`${redisPrefix}${channelId}`, listener); }); stopHeartbeat(); @@ -1135,45 +819,6 @@ const startWorker = async (workerId) => { process.on('uncaughtException', onError); }; -/** - * @param {any} server - * @param {function(string): void} [onSuccess] - */ -const attachServerWithConfig = (server, onSuccess) => { - if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { - server.listen(process.env.SOCKET || process.env.PORT, () => { - if (onSuccess) { - fs.chmodSync(server.address(), 0o666); - onSuccess(server.address()); - } - }); - } else { - server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => { - if (onSuccess) { - onSuccess(`${server.address().address}:${server.address().port}`); - } - }); - } -}; - -/** - * @param {function(Error=): void} onSuccess - */ -const onPortAvailable = onSuccess => { - const testServer = http.createServer(); - - testServer.once('error', err => { - onSuccess(err); - }); - - testServer.once('listening', () => { - testServer.once('close', () => onSuccess()); - testServer.close(); - }); - - attachServerWithConfig(testServer); -}; - onPortAvailable(err => { if (err) { log.error('Could not start server, the port or socket is in use'); diff --git a/streaming/middlewares.mjs b/streaming/middlewares.mjs new file mode 100644 index 0000000000..d90c1ba967 --- /dev/null +++ b/streaming/middlewares.mjs @@ -0,0 +1,62 @@ +// @ts-check + +import log from 'npmlog'; +import * as uuid from 'uuid'; + +import { alwaysRequireAuth } from './constants.mjs'; +import { checkScopes } from './utils.mjs'; + +/** @type {import('express').RequestHandler} */ +export function allowCrossDomain(_req, res, next) { + res.header('Access-Control-Allow-Origin', '*'); + res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control'); + res.header('Access-Control-Allow-Methods', 'GET, OPTIONS'); + + next(); +} + +/** @type {import('express').RequestHandler} */ +export function authenticationMiddleware(req, res, next) { + if (req.method === 'OPTIONS') { + next(); + return; + } + + accountFromRequest(req, alwaysRequireAuth) + .then(() => checkScopes(req, channelNameFromPath(req))) + .then(() => { + subscribeHttpToSystemChannel(req, res); + }).then(() => { + next(); + }).catch(err => { + next(err); + }); +} + +/** @type {import('express').ErrorRequestHandler} */ +export function errorMiddleware(err, req, res, next) { + log.error(req.requestId, err.toString()); + + if (res.headersSent) { + next(err); + return; + } + + res.writeHead(err.status || 500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' })); +} + +/** @type {import('express').RequestHandler} */ +export function setRemoteAddress(req, _res, next) { + req.remoteAddress = req.connection.remoteAddress; + + next(); +} + +/** @type {import('express').RequestHandler} */ +export function setRequestId(req, res, next) { + req.requestId = uuid.v4(); + res.header('X-Request-Id', req.requestId); + + next(); +} diff --git a/streaming/subscriber.mjs b/streaming/subscriber.mjs new file mode 100644 index 0000000000..18ee048af4 --- /dev/null +++ b/streaming/subscriber.mjs @@ -0,0 +1,88 @@ +// @ts-check + +import log from 'npmlog'; + +const LOG_PREFIX = 'subscriber'; + +/** + * @typedef SubscriberOptions + * @property {import('redis').RedisClientType} redisClient + */ + +/** + * @callback SubscribeListener + * @param {string} message + */ + +export default class Subscriber { + + /** + * @param {SubscriberOptions} options + */ + constructor({ redisClient }) { + /** @type {import('redis').RedisClientType} */ + this.redisClient = redisClient; + + /** @type {Map>} */ + this.subs = new Map(); + } + + /** + * @param {string} message + * @param {string} channel + * @returns {void} + */ + handleRedisMessage = (message, channel) => { + log.silly(LOG_PREFIX, `New message on channel ${channel}`); + + if (this.subs.has(channel)) { + for (const listener of this.subs.get(channel)) { + listener.call(this, message); + } + } + }; + + /** + * @param {string} channel + * @param {SubscribeListener} listener + * @returns {void} + */ + register(channel, listener) { + log.silly(LOG_PREFIX, `Adding listener for ${channel}`); + + if (this.subs.has(channel)) { + this.subs.set(channel, new Set()); + } + + if (this.subs.get(channel).size < 1) { + log.verbose(LOG_PREFIX, `Subscribe ${channel}`); + this.redisClient.subscribe(channel, this.handleRedisMessage); + } + + this.subs.get(channel).add(listener); + } + + /** + * @param {string} channel + * @param {SubscribeListener=} listener + * @returns {void} + */ + unregister(channel, listener) { + log.silly(LOG_PREFIX, `Removing listener for ${channel}`); + + if (!this.subs.has(channel)) { + return; + } + + if (typeof listener === 'function') { + this.subs.get(channel).delete(listener); + } + + if (this.subs.get(channel).size < 1) { + log.verbose(LOG_PREFIX, `Unsubscribe ${channel}`); + this.redisClient.unsubscribe(channel); + this.subs.delete(channel); + } + } + +} diff --git a/streaming/utils.mjs b/streaming/utils.mjs new file mode 100644 index 0000000000..f3d4fda143 --- /dev/null +++ b/streaming/utils.mjs @@ -0,0 +1,272 @@ +// @ts-check + +import * as fs from 'node:fs'; +import * as http from 'node:http'; + +import log from 'npmlog'; +import * as redis from 'redis'; + +const LOG_PREFIX = 'utils'; + +/** + * @param {http.Server} server + * @param {((address: string) => void)=} onSuccess + * @returns {void} + */ +export function attachServerWithConfig(server, onSuccess) { + if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { + server.listen(process.env.SOCKET || process.env.PORT, () => { + if (!onSuccess) return; + + fs.chmodSync(server.address(), 0o666); + onSuccess(server.address()); + }); + } else { + server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => { + if (onSuccess) { + onSuccess(`${server.address().address}:${server.address().port}`); + } + }); + } +}; + +const PUBLIC_CHANNELS = [ + 'public', + 'public:media', + 'public:local', + 'public:local:media', + 'public:remote', + 'public:remote:media', + 'hashtag', + 'hashtag:local', +]; + +/** + * @param {import('express').Request} req + * @param {string} channelName + * @return {Promise} + */ +export function checkScopes(req, channelName) { + return new Promise((resolve, reject) => { + log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`); + + // When accessing public channels, no scopes are needed + if (PUBLIC_CHANNELS.includes(channelName)) { + resolve(); + return; + } + + // The `read` scope has the highest priority, if the token has it + // then it can access all streams + const requiredScopes = ['read']; + + // When accessing specifically the notifications stream, + // we need a read:notifications, while in all other cases, + // we can allow access with read:statuses. Mind that the + // user stream will not contain notifications unless + // the token has either read or read:notifications scope + // as well, this is handled separately. + if (channelName === 'user:notification') { + requiredScopes.push('read:notifications'); + } else { + requiredScopes.push('read:statuses'); + } + + if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) { + resolve(); + return; + } + + const err = new Error('Access token does not cover required scopes'); + err.status = 401; + + reject(err); + }); +} + +/** + * @param {string} dbUrl + * @return {import('pg').PoolConfig} + */ +export function dbUrlToConfig(dbUrl) { + if (!dbUrl) { + return {}; + } + + const url = new URL(dbUrl); + /** @type {import('pg').PoolConfig} */ + const config = {}; + + if (url.username) { + config.user = url.username; + } + + if (url.password) { + config.password = url.password; + } + + if (url.hostname) { + config.host = url.hostname; + } + + if (url.port) { + config.port = +url.port; + } + + if (url.pathname) { + config.database = url.pathname.split('/')[1]; + } + + if (url.searchParams.has('ssl')) { + config.ssl = isTruthy(url.searchParams.get('ssl')); + } + + return config; +} + +/** + * @param {string | string[]} arrayOrString + * @returns {string} + */ +export function firstParam(arrayOrString) { + return Array.isArray(arrayOrString) ? arrayOrString[0] : arrayOrString; +} + +/** + * @param {import('express').Response} res + * @returns {void} + */ +export function httpNotFound(res) { + res.status(404).json({ + error: 'Not found', + }); +}; + +const FALSE_VALUES = [ + false, + 0, + '0', + 'f', + 'F', + 'false', + 'FALSE', + 'off', + 'OFF', +]; + +/** + * @param {import('express').Request} req + * @param {string[]} necessaryScopes + * @returns {boolean} + */ +export function isInScope(req, necessaryScopes) { + return req.scopes.some(scope => necessaryScopes.includes(scope)); +} + +/** + * @param {boolean | number | string} value + * @return {boolean} + */ +export function isTruthy(value) { + return value && !FALSE_VALUES.includes(value); +} + +/** + * @param {((err?: Error) => void)=} onSuccess + * @returns {void} + */ +export function onPortAvailable(onSuccess) { + const testServer = http.createServer(); + + testServer.once('error', err => { + onSuccess(err); + }); + + testServer.once('listening', () => { + testServer.once('close', () => onSuccess()); + testServer.close(); + }); + + attachServerWithConfig(testServer); +} + +/** + * @param {string} json + * @param {Express.Request} req + * @returns {object | null} + */ +export function parseJSON(json, req) { + try { + return JSON.parse(json); + } catch (err) { + if (req.accountId) { + log.warn(req.requestId, `Error parsing message from user ${req.accountId}:`, err); + } else { + log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}:`, err); + } + } + + return null; +} + +/** + * @param {string[]} values + * @param {number=} shift + * @return {string} + */ +export function placeholders(values, shift = 0) { + return values.map((_, i) => `$${i + 1 + shift}`).join(', '); +} + +/** + * @param {redis.RedisClientOptions} defaultConfig + * @param {string} redisUrl + * @returns {Promise} + */ +export async function redisUrlToClient(defaultConfig, redisUrl) { + /** @type {redis.RedisClientType} */ + let client; + + if (!redisUrl) { + client = redis.createClient(defaultConfig); + } else if (redisUrl.startsWith('unix://')) { + client = redis.createClient({ + ...defaultConfig, + socket: { + path: redisUrl.slice(7), + }, + }); + } else { + client = redis.createClient({ + ...defaultConfig, + url: redisUrl, + }); + } + + client.on('error', (err) => { + log.error(LOG_PREFIX, 'Redis Client Error!', err); + }); + await client.connect(); + + return client; +} + +/** + * @param {import('express').Request} req + * @param {import('express').Response} res + */ +export function subscribeHttpToSystemChannel(req, res) { + const systemChannelId = `timeline:access_token:${req.accessTokenId}`; + + const listener = createSystemMessageListener(req, { + onKill() { + res.end(); + }, + }); + + res.on('close', () => { + unsubscribe(`${redisPrefix}${systemChannelId}`, listener); + }); + + subscribe(`${redisPrefix}${systemChannelId}`, listener); +}