Merge tag 'v4.1.6' of https://github.com/mastodon/mastodon into paravielfalt-4.1
All checks were successful
continuous-integration/drone/tag Build is passing
All checks were successful
continuous-integration/drone/tag Build is passing
This commit is contained in:
commit
09a5c13129
5 changed files with 150 additions and 72 deletions
|
@ -3,6 +3,14 @@ Changelog
|
|||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## [4.1.6] - 2023-07-31
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix memory leak in streaming server ([ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26228))
|
||||
- Fix wrong filters sometimes applying in streaming ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26159), [ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26213), [renchap](https://github.com/mastodon/mastodon/pull/26233))
|
||||
- Fix incorrect connect timeout in outgoing requests ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26116))
|
||||
|
||||
## [4.1.5] - 2023-07-21
|
||||
|
||||
### Added
|
||||
|
|
|
@ -285,11 +285,11 @@ class Request
|
|||
end
|
||||
|
||||
until socks.empty?
|
||||
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect])
|
||||
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect_timeout])
|
||||
|
||||
if available_socks.nil?
|
||||
socks.each(&:close)
|
||||
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds"
|
||||
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect_timeout]} seconds"
|
||||
end
|
||||
|
||||
available_socks.each do |sock|
|
||||
|
|
|
@ -56,7 +56,7 @@ services:
|
|||
|
||||
web:
|
||||
build: .
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||
restart: always
|
||||
env_file: .env.production
|
||||
command: bash -c "rm -f /mastodon/tmp/pids/server.pid; bundle exec rails s -p 3000"
|
||||
|
@ -77,7 +77,7 @@ services:
|
|||
|
||||
streaming:
|
||||
build: .
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||
restart: always
|
||||
env_file: .env.production
|
||||
command: node ./streaming
|
||||
|
@ -95,7 +95,7 @@ services:
|
|||
|
||||
sidekiq:
|
||||
build: .
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||
restart: always
|
||||
env_file: .env.production
|
||||
command: bundle exec sidekiq
|
||||
|
|
|
@ -13,7 +13,7 @@ module Mastodon
|
|||
end
|
||||
|
||||
def patch
|
||||
5
|
||||
6
|
||||
end
|
||||
|
||||
def flags
|
||||
|
|
|
@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
|
|||
callbacks.forEach(callback => callback(json));
|
||||
};
|
||||
|
||||
/**
|
||||
* @callback SubscriptionListener
|
||||
* @param {ReturnType<parseJSON>} json of the message
|
||||
* @returns void
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {string} channel
|
||||
* @param {function(string): void} callback
|
||||
* @param {SubscriptionListener} callback
|
||||
*/
|
||||
const subscribe = (channel, callback) => {
|
||||
log.silly(`Adding listener for ${channel}`);
|
||||
|
@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
|
|||
|
||||
/**
|
||||
* @param {string} channel
|
||||
* @param {function(Object<string, any>): void} callback
|
||||
* @param {SubscriptionListener} callback
|
||||
*/
|
||||
const unsubscribe = (channel, callback) => {
|
||||
log.silly(`Removing listener for ${channel}`);
|
||||
|
@ -623,51 +629,66 @@ const startWorker = async (workerId) => {
|
|||
* @param {string[]} ids
|
||||
* @param {any} req
|
||||
* @param {function(string, string): void} output
|
||||
* @param {function(string[], function(string): void): void} attachCloseHandler
|
||||
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
|
||||
* @param {boolean=} needsFiltering
|
||||
* @returns {function(object): void}
|
||||
* @returns {SubscriptionListener}
|
||||
*/
|
||||
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
|
||||
const accountId = req.accountId || req.remoteAddress;
|
||||
|
||||
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
|
||||
|
||||
// Currently message is of type string, soon it'll be Record<string, any>
|
||||
const transmit = (event, payload) => {
|
||||
// TODO: Replace "string"-based delete payloads with object payloads:
|
||||
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
|
||||
|
||||
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
|
||||
output(event, encodedPayload);
|
||||
};
|
||||
|
||||
// The listener used to process each message off the redis subscription,
|
||||
// message here is an object with an `event` and `payload` property. Some
|
||||
// events also include a queued_at value, but this is being removed shortly.
|
||||
/** @type {SubscriptionListener} */
|
||||
const listener = message => {
|
||||
const { event, payload, queued_at } = message;
|
||||
const { event, payload } = message;
|
||||
|
||||
const transmit = () => {
|
||||
const now = new Date().getTime();
|
||||
const delta = now - queued_at;
|
||||
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
|
||||
|
||||
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
|
||||
output(event, encodedPayload);
|
||||
};
|
||||
|
||||
// Only messages that may require filtering are statuses, since notifications
|
||||
// are already personalized and deletes do not matter
|
||||
if (!needsFiltering || event !== 'update') {
|
||||
transmit();
|
||||
// Streaming only needs to apply filtering to some channels and only to
|
||||
// some events. This is because majority of the filtering happens on the
|
||||
// Ruby on Rails side when producing the event for streaming.
|
||||
//
|
||||
// The only events that require filtering from the streaming server are
|
||||
// `update` and `status.update`, all other events are transmitted to the
|
||||
// client as soon as they're received (pass-through).
|
||||
//
|
||||
// The channels that need filtering are determined in the function
|
||||
// `channelNameToIds` defined below:
|
||||
if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
|
||||
transmit(event, payload);
|
||||
return;
|
||||
}
|
||||
|
||||
const unpackedPayload = payload;
|
||||
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
|
||||
const accountDomain = unpackedPayload.account.acct.split('@')[1];
|
||||
// The rest of the logic from here on in this function is to handle
|
||||
// filtering of statuses:
|
||||
|
||||
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
|
||||
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
|
||||
// Filter based on language:
|
||||
if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
|
||||
log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
|
||||
return;
|
||||
}
|
||||
|
||||
// When the account is not logged in, it is not necessary to confirm the block or mute
|
||||
if (!req.accountId) {
|
||||
transmit();
|
||||
transmit(event, payload);
|
||||
return;
|
||||
}
|
||||
|
||||
pgPool.connect((err, client, done) => {
|
||||
// Filter based on domain blocks, blocks, mutes, or custom filters:
|
||||
const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
|
||||
const accountDomain = payload.account.acct.split('@')[1];
|
||||
|
||||
// TODO: Move this logic out of the message handling loop
|
||||
pgPool.connect((err, client, releasePgConnection) => {
|
||||
if (err) {
|
||||
log.error(err);
|
||||
return;
|
||||
|
@ -682,40 +703,57 @@ const startWorker = async (workerId) => {
|
|||
SELECT 1
|
||||
FROM mutes
|
||||
WHERE account_id = $1
|
||||
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
|
||||
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
|
||||
];
|
||||
|
||||
if (accountDomain) {
|
||||
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
|
||||
}
|
||||
|
||||
if (!unpackedPayload.filtered && !req.cachedFilters) {
|
||||
if (!payload.filtered && !req.cachedFilters) {
|
||||
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
|
||||
}
|
||||
|
||||
Promise.all(queries).then(values => {
|
||||
done();
|
||||
releasePgConnection();
|
||||
|
||||
// Handling blocks & mutes and domain blocks: If one of those applies,
|
||||
// then we don't transmit the payload of the event to the client
|
||||
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!unpackedPayload.filtered && !req.cachedFilters) {
|
||||
// If the payload already contains the `filtered` property, it means
|
||||
// that filtering has been applied on the ruby on rails side, as
|
||||
// such, we don't need to construct or apply the filters in streaming:
|
||||
if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
|
||||
transmit(event, payload);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handling for constructing the custom filters and caching them on the request
|
||||
// TODO: Move this logic out of the message handling lifecycle
|
||||
if (!req.cachedFilters) {
|
||||
const filterRows = values[accountDomain ? 2 : 1].rows;
|
||||
|
||||
req.cachedFilters = filterRows.reduce((cache, row) => {
|
||||
if (cache[row.id]) {
|
||||
cache[row.id].keywords.push([row.keyword, row.whole_word]);
|
||||
req.cachedFilters = filterRows.reduce((cache, filter) => {
|
||||
if (cache[filter.id]) {
|
||||
cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
|
||||
} else {
|
||||
cache[row.id] = {
|
||||
keywords: [[row.keyword, row.whole_word]],
|
||||
expires_at: row.expires_at,
|
||||
repr: {
|
||||
id: row.id,
|
||||
title: row.title,
|
||||
context: row.context,
|
||||
expires_at: row.expires_at,
|
||||
filter_action: ['warn', 'hide'][row.filter_action],
|
||||
cache[filter.id] = {
|
||||
keywords: [[filter.keyword, filter.whole_word]],
|
||||
expires_at: filter.expires_at,
|
||||
filter: {
|
||||
id: filter.id,
|
||||
title: filter.title,
|
||||
context: filter.context,
|
||||
expires_at: filter.expires_at,
|
||||
// filter.filter_action is the value from the
|
||||
// custom_filters.action database column, it is an integer
|
||||
// representing a value in an enum defined by Ruby on Rails:
|
||||
//
|
||||
// enum { warn: 0, hide: 1 }
|
||||
filter_action: ['warn', 'hide'][filter.filter_action],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
@ -723,6 +761,10 @@ const startWorker = async (workerId) => {
|
|||
return cache;
|
||||
}, {});
|
||||
|
||||
// Construct the regular expressions for the custom filters: This
|
||||
// needs to be done in a separate loop as the database returns one
|
||||
// filterRow per keyword, so we need all the keywords before
|
||||
// constructing the regular expression
|
||||
Object.keys(req.cachedFilters).forEach((key) => {
|
||||
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
|
||||
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||
|
@ -742,31 +784,58 @@ const startWorker = async (workerId) => {
|
|||
});
|
||||
}
|
||||
|
||||
// Check filters
|
||||
if (req.cachedFilters && !unpackedPayload.filtered) {
|
||||
const status = unpackedPayload;
|
||||
const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
|
||||
const searchIndex = JSDOM.fragment(searchContent).textContent;
|
||||
// Apply cachedFilters against the payload, constructing a
|
||||
// `filter_results` array of FilterResult entities
|
||||
if (req.cachedFilters) {
|
||||
const status = payload;
|
||||
// TODO: Calculate searchableContent in Ruby on Rails:
|
||||
const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
|
||||
const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
|
||||
|
||||
const now = new Date();
|
||||
payload.filtered = [];
|
||||
Object.values(req.cachedFilters).forEach((cachedFilter) => {
|
||||
if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
|
||||
const keyword_matches = searchIndex.match(cachedFilter.regexp);
|
||||
if (keyword_matches) {
|
||||
payload.filtered.push({
|
||||
filter: cachedFilter.repr,
|
||||
keyword_matches,
|
||||
});
|
||||
}
|
||||
const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
|
||||
// Check the filter hasn't expired before applying:
|
||||
if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
|
||||
return results;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
transmit();
|
||||
// Just in-case JSDOM fails to find textContent in searchableContent
|
||||
if (!searchableTextContent) {
|
||||
return results;
|
||||
}
|
||||
|
||||
const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
|
||||
if (keyword_matches) {
|
||||
// results is an Array of FilterResult; status_matches is always
|
||||
// null as we only are only applying the keyword-based custom
|
||||
// filters, not the status-based custom filters.
|
||||
// https://docs.joinmastodon.org/entities/FilterResult/
|
||||
results.push({
|
||||
filter: cachedFilter.filter,
|
||||
keyword_matches,
|
||||
status_matches: null
|
||||
});
|
||||
}
|
||||
|
||||
return results;
|
||||
}, []);
|
||||
|
||||
// Send the payload + the FilterResults as the `filtered` property
|
||||
// to the streaming connection. To reach this code, the `event` must
|
||||
// have been either `update` or `status.update`, meaning the
|
||||
// `payload` is a Status entity, which has a `filtered` property:
|
||||
//
|
||||
// filtered: https://docs.joinmastodon.org/entities/Status/#filtered
|
||||
transmit(event, {
|
||||
...payload,
|
||||
filtered: filter_results
|
||||
});
|
||||
} else {
|
||||
transmit(event, payload);
|
||||
}
|
||||
}).catch(err => {
|
||||
releasePgConnection();
|
||||
log.error(err);
|
||||
done();
|
||||
});
|
||||
});
|
||||
};
|
||||
|
@ -775,7 +844,7 @@ const startWorker = async (workerId) => {
|
|||
subscribe(`${redisPrefix}${id}`, listener);
|
||||
});
|
||||
|
||||
if (attachCloseHandler) {
|
||||
if (typeof attachCloseHandler === 'function') {
|
||||
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
|
||||
}
|
||||
|
||||
|
@ -812,12 +881,13 @@ const startWorker = async (workerId) => {
|
|||
/**
|
||||
* @param {any} req
|
||||
* @param {function(): void} [closeHandler]
|
||||
* @return {function(string[]): void}
|
||||
* @returns {function(string[], SubscriptionListener): void}
|
||||
*/
|
||||
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
|
||||
|
||||
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
|
||||
req.on('close', () => {
|
||||
ids.forEach(id => {
|
||||
unsubscribe(id);
|
||||
unsubscribe(id, listener);
|
||||
});
|
||||
|
||||
if (closeHandler) {
|
||||
|
@ -1077,7 +1147,7 @@ const startWorker = async (workerId) => {
|
|||
* @typedef WebSocketSession
|
||||
* @property {any} socket
|
||||
* @property {any} request
|
||||
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
|
||||
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
|
||||
*/
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue