Compare commits

...

15 commits

Author SHA1 Message Date
09a5c13129 Merge tag 'v4.1.6' of https://github.com/mastodon/mastodon into paravielfalt-4.1
All checks were successful
continuous-integration/drone/tag Build is passing
2023-08-06 13:17:41 +02:00
Claire
ac7d40b561 Bump version to v4.1.6 2023-07-31 14:33:06 +02:00
Renaud Chaput
2fc6117d1b Fix missing return values in streaming (#26233) 2023-07-31 14:33:06 +02:00
Emelia Smith
2eb1a5b7b6 Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228) 2023-07-31 14:33:06 +02:00
Claire
6c321bb5e1 Fix incorrect connect timeout in outgoing requests (#26116) 2023-07-31 14:33:06 +02:00
Emelia Smith
da230600ac Refactor streaming's filtering logic & improve documentation (#26213) 2023-07-31 14:33:06 +02:00
Claire
1792be342a Fix wrong filters sometimes applying in streaming (#26159) 2023-07-31 14:33:06 +02:00
e942923019 Merge tag 'v4.1.5' of https://github.com/mastodon/mastodon into paravielfalt-4.1
All checks were successful
continuous-integration/drone/tag Build is passing
2023-07-23 20:00:11 +02:00
Claire
ebf4f034c2 Bump version to v4.1.5 2023-07-21 16:07:43 +02:00
Claire
889102013f Fix CSP headers being unintendedly wide (#26105) 2023-07-21 16:07:43 +02:00
Claire
d94a2c8aca Change request timeout handling to use a longer deadline (#26055) 2023-07-21 16:07:43 +02:00
Claire
efd066670d Fix moderation interface for remote instances with a .zip TLD (#25885) 2023-07-21 16:07:43 +02:00
Claire
13ec425b72 Fix remote accounts being possibly persisted to database with incomplete protocol values (#25886) 2023-07-21 16:07:43 +02:00
Michael Stanclift
7a99f0744d Fix trending publishers table not rendering correctly on narrow screens (#25945) 2023-07-21 16:07:43 +02:00
Claire
69c8f26946
Add check preventing Sidekiq workers from running with Makara configured (#25850)
Co-authored-by: Eugen Rochko <eugen@zeonfederated.com>
2023-07-21 14:18:04 +02:00
11 changed files with 225 additions and 82 deletions

View file

@ -3,6 +3,34 @@ Changelog
All notable changes to this project will be documented in this file.
## [4.1.6] - 2023-07-31
### Fixed
- Fix memory leak in streaming server ([ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26228))
- Fix wrong filters sometimes applying in streaming ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26159), [ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26213), [renchap](https://github.com/mastodon/mastodon/pull/26233))
- Fix incorrect connect timeout in outgoing requests ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26116))
## [4.1.5] - 2023-07-21
### Added
- Add check preventing Sidekiq workers from running with Makara configured ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/25850))
### Changed
- Change request timeout handling to use a longer deadline ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26055))
### Fixed
- Fix moderation interface for remote instances with a .zip TLD ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/25886))
- Fix remote accounts being possibly persisted to database with incomplete protocol values ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/25886))
- Fix trending publishers table not rendering correctly on narrow screens ([vmstan](https://github.com/mastodon/mastodon/pull/25945))
### Security
- Fix CSP headers being unintentionally wide ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26105))
## [4.1.4] - 2023-07-07
### Fixed

View file

@ -4,14 +4,22 @@ require 'ipaddr'
require 'socket'
require 'resolv'
# Monkey-patch the HTTP.rb timeout class to avoid using a timeout block
# Use our own timeout class to avoid using HTTP.rb's timeout block
# around the Socket#open method, since we use our own timeout blocks inside
# that method
#
# Also changes how the read timeout behaves so that it is cumulative (closer
# to HTTP::Timeout::Global, but still having distinct timeouts for other
# operation types)
class HTTP::Timeout::PerOperation
class PerOperationWithDeadline < HTTP::Timeout::PerOperation
READ_DEADLINE = 30
def initialize(*args)
super
@read_deadline = options.fetch(:read_deadline, READ_DEADLINE)
end
def connect(socket_class, host, port, nodelay = false)
@socket = socket_class.open(host, port)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) if nodelay
@ -24,7 +32,7 @@ class HTTP::Timeout::PerOperation
# Read data from the socket
def readpartial(size, buffer = nil)
@deadline ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) + @read_timeout
@deadline ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) + @read_deadline
timeout = false
loop do
@ -33,7 +41,8 @@ class HTTP::Timeout::PerOperation
return :eof if result.nil?
remaining_time = @deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
raise HTTP::TimeoutError, "Read timed out after #{@read_timeout} seconds" if timeout || remaining_time <= 0
raise HTTP::TimeoutError, "Read timed out after #{@read_timeout} seconds" if timeout
raise HTTP::TimeoutError, "Read timed out after a total of #{@read_deadline} seconds" if remaining_time <= 0
return result if result != :wait_readable
# marking the socket for timeout. Why is this not being raised immediately?
@ -46,7 +55,7 @@ class HTTP::Timeout::PerOperation
# timeout. Else, the first timeout was a proper timeout.
# This hack has to be done because io/wait#wait_readable doesn't provide a value for when
# the socket is closed by the server, and HTTP::Parser doesn't provide the limit for the chunks.
timeout = true unless @socket.to_io.wait_readable(remaining_time)
timeout = true unless @socket.to_io.wait_readable([remaining_time, @read_timeout].min)
end
end
end
@ -57,7 +66,7 @@ class Request
# We enforce a 5s timeout on DNS resolving, 5s timeout on socket opening
# and 5s timeout on the TLS handshake, meaning the worst case should take
# about 15s in total
TIMEOUT = { connect: 5, read: 10, write: 10 }.freeze
TIMEOUT = { connect_timeout: 5, read_timeout: 10, write_timeout: 10, read_deadline: 30 }.freeze
include RoutingHelper
@ -69,6 +78,7 @@ class Request
@http_client = options.delete(:http_client)
@allow_local = options.delete(:allow_local)
@options = options.merge(socket_class: use_proxy? || @allow_local ? ProxySocket : Socket)
@options = @options.merge(timeout_class: PerOperationWithDeadline, timeout_options: TIMEOUT)
@options = @options.merge(proxy_url) if use_proxy?
@headers = {}
@ -129,7 +139,7 @@ class Request
end
def http_client
HTTP.use(:auto_inflate).timeout(TIMEOUT.dup).follow(max_hops: 3)
HTTP.use(:auto_inflate).follow(max_hops: 3)
end
end
@ -275,11 +285,11 @@ class Request
end
until socks.empty?
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect])
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect_timeout])
if available_socks.nil?
socks.each(&:close)
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds"
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect_timeout]} seconds"
end
available_socks.each do |sock|

View file

@ -76,6 +76,9 @@ class ActivityPub::ProcessAccountService < BaseService
@account.suspended_at = domain_block.created_at if auto_suspend?
@account.suspension_origin = :local if auto_suspend?
@account.silenced_at = domain_block.created_at if auto_silence?
set_immediate_protocol_attributes!
@account.save
end

View file

@ -29,7 +29,7 @@
- Trends::PreviewCardProviderFilter::KEYS.each do |key|
= hidden_field_tag key, params[key] if params[key].present?
.batch-table.optional
.batch-table
.batch-table__toolbar
%label.batch-table__toolbar__select.batch-checkbox-all
= check_box_tag :batch_checkbox_all, nil, false

View file

@ -3,7 +3,7 @@
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy
def host_to_url(str)
"http#{Rails.configuration.x.use_https ? 's' : ''}://#{str}".split('/').first if str.present?
"http#{Rails.configuration.x.use_https ? 's' : ''}://#{str.split('/').first}" if str.present?
end
base_host = Rails.configuration.x.web_domain

View file

@ -3,6 +3,11 @@
require_relative '../../lib/mastodon/sidekiq_middleware'
Sidekiq.configure_server do |config|
if Rails.configuration.database_configuration.dig('production', 'adapter') == 'postgresql_makara'
STDERR.puts 'ERROR: Database replication is not currently supported in Sidekiq workers. Check your configuration.'
exit 1
end
config.redis = REDIS_SIDEKIQ_PARAMS
config.server_middleware do |chain|

View file

@ -292,7 +292,7 @@ Rails.application.routes.draw do
end
end
resources :instances, only: [:index, :show, :destroy], constraints: { id: /[^\/]+/ } do
resources :instances, only: [:index, :show, :destroy], constraints: { id: /[^\/]+/ }, format: 'html' do
member do
post :clear_delivery_errors
post :restart_delivery

View file

@ -56,7 +56,7 @@ services:
web:
build: .
image: ghcr.io/mastodon/mastodon
image: ghcr.io/mastodon/mastodon:v4.1.6
restart: always
env_file: .env.production
command: bash -c "rm -f /mastodon/tmp/pids/server.pid; bundle exec rails s -p 3000"
@ -77,7 +77,7 @@ services:
streaming:
build: .
image: ghcr.io/mastodon/mastodon
image: ghcr.io/mastodon/mastodon:v4.1.6
restart: always
env_file: .env.production
command: node ./streaming
@ -95,7 +95,7 @@ services:
sidekiq:
build: .
image: ghcr.io/mastodon/mastodon
image: ghcr.io/mastodon/mastodon:v4.1.6
restart: always
env_file: .env.production
command: bundle exec sidekiq

View file

@ -13,7 +13,7 @@ module Mastodon
end
def patch
4
6
end
def flags

View file

@ -0,0 +1,27 @@
# frozen_string_literal: true
require 'rails_helper'
describe 'Content-Security-Policy' do
it 'sets the expected CSP headers' do
allow(SecureRandom).to receive(:base64).with(16).and_return('ZbA+JmE7+bK8F5qvADZHuQ==')
get '/'
expect(response.headers['Content-Security-Policy'].split(';').map(&:strip)).to contain_exactly(
"base-uri 'none'",
"default-src 'none'",
"frame-ancestors 'none'",
"font-src 'self' https://cb6e6126.ngrok.io",
"img-src 'self' https: data: blob: https://cb6e6126.ngrok.io",
"style-src 'self' https://cb6e6126.ngrok.io 'nonce-ZbA+JmE7+bK8F5qvADZHuQ=='",
"media-src 'self' https: data: https://cb6e6126.ngrok.io",
"frame-src 'self' https:",
"manifest-src 'self' https://cb6e6126.ngrok.io",
"form-action 'self'",
"child-src 'self' blob: https://cb6e6126.ngrok.io",
"worker-src 'self' blob: https://cb6e6126.ngrok.io",
"connect-src 'self' data: blob: https://cb6e6126.ngrok.io https://cb6e6126.ngrok.io ws://localhost:4000",
"script-src 'self' https://cb6e6126.ngrok.io 'wasm-unsafe-eval'"
)
end
end

View file

@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
callbacks.forEach(callback => callback(json));
};
/**
* @callback SubscriptionListener
* @param {ReturnType<parseJSON>} json of the message
* @returns void
*/
/**
* @param {string} channel
* @param {function(string): void} callback
* @param {SubscriptionListener} callback
*/
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`);
@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} channel
* @param {function(Object<string, any>): void} callback
* @param {SubscriptionListener} callback
*/
const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`);
@ -623,51 +629,66 @@ const startWorker = async (workerId) => {
* @param {string[]} ids
* @param {any} req
* @param {function(string, string): void} output
* @param {function(string[], function(string): void): void} attachCloseHandler
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
* @param {boolean=} needsFiltering
* @returns {function(object): void}
* @returns {SubscriptionListener}
*/
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress;
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
// Currently message is of type string, soon it'll be Record<string, any>
const transmit = (event, payload) => {
// TODO: Replace "string"-based delete payloads with object payloads:
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
output(event, encodedPayload);
};
// The listener used to process each message off the redis subscription,
// message here is an object with an `event` and `payload` property. Some
// events also include a queued_at value, but this is being removed shortly.
/** @type {SubscriptionListener} */
const listener = message => {
const { event, payload, queued_at } = message;
const { event, payload } = message;
const transmit = () => {
const now = new Date().getTime();
const delta = now - queued_at;
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
output(event, encodedPayload);
};
// Only messages that may require filtering are statuses, since notifications
// are already personalized and deletes do not matter
if (!needsFiltering || event !== 'update') {
transmit();
// Streaming only needs to apply filtering to some channels and only to
// some events. This is because majority of the filtering happens on the
// Ruby on Rails side when producing the event for streaming.
//
// The only events that require filtering from the streaming server are
// `update` and `status.update`, all other events are transmitted to the
// client as soon as they're received (pass-through).
//
// The channels that need filtering are determined in the function
// `channelNameToIds` defined below:
if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
transmit(event, payload);
return;
}
const unpackedPayload = payload;
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
const accountDomain = unpackedPayload.account.acct.split('@')[1];
// The rest of the logic from here on in this function is to handle
// filtering of statuses:
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
// Filter based on language:
if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
return;
}
// When the account is not logged in, it is not necessary to confirm the block or mute
if (!req.accountId) {
transmit();
transmit(event, payload);
return;
}
pgPool.connect((err, client, done) => {
// Filter based on domain blocks, blocks, mutes, or custom filters:
const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
const accountDomain = payload.account.acct.split('@')[1];
// TODO: Move this logic out of the message handling loop
pgPool.connect((err, client, releasePgConnection) => {
if (err) {
log.error(err);
return;
@ -682,40 +703,57 @@ const startWorker = async (workerId) => {
SELECT 1
FROM mutes
WHERE account_id = $1
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
];
if (accountDomain) {
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
}
if (!unpackedPayload.filtered && !req.cachedFilters) {
if (!payload.filtered && !req.cachedFilters) {
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
}
Promise.all(queries).then(values => {
done();
releasePgConnection();
// Handling blocks & mutes and domain blocks: If one of those applies,
// then we don't transmit the payload of the event to the client
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
return;
}
if (!unpackedPayload.filtered && !req.cachedFilters) {
// If the payload already contains the `filtered` property, it means
// that filtering has been applied on the ruby on rails side, as
// such, we don't need to construct or apply the filters in streaming:
if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
transmit(event, payload);
return;
}
// Handling for constructing the custom filters and caching them on the request
// TODO: Move this logic out of the message handling lifecycle
if (!req.cachedFilters) {
const filterRows = values[accountDomain ? 2 : 1].rows;
req.cachedFilters = filterRows.reduce((cache, row) => {
if (cache[row.id]) {
cache[row.id].keywords.push([row.keyword, row.whole_word]);
req.cachedFilters = filterRows.reduce((cache, filter) => {
if (cache[filter.id]) {
cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
} else {
cache[row.id] = {
keywords: [[row.keyword, row.whole_word]],
expires_at: row.expires_at,
repr: {
id: row.id,
title: row.title,
context: row.context,
expires_at: row.expires_at,
filter_action: ['warn', 'hide'][row.filter_action],
cache[filter.id] = {
keywords: [[filter.keyword, filter.whole_word]],
expires_at: filter.expires_at,
filter: {
id: filter.id,
title: filter.title,
context: filter.context,
expires_at: filter.expires_at,
// filter.filter_action is the value from the
// custom_filters.action database column, it is an integer
// representing a value in an enum defined by Ruby on Rails:
//
// enum { warn: 0, hide: 1 }
filter_action: ['warn', 'hide'][filter.filter_action],
},
};
}
@ -723,6 +761,10 @@ const startWorker = async (workerId) => {
return cache;
}, {});
// Construct the regular expressions for the custom filters: This
// needs to be done in a separate loop as the database returns one
// filterRow per keyword, so we need all the keywords before
// constructing the regular expression
Object.keys(req.cachedFilters).forEach((key) => {
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
@ -742,31 +784,58 @@ const startWorker = async (workerId) => {
});
}
// Check filters
if (req.cachedFilters && !unpackedPayload.filtered) {
const status = unpackedPayload;
const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
const searchIndex = JSDOM.fragment(searchContent).textContent;
// Apply cachedFilters against the payload, constructing a
// `filter_results` array of FilterResult entities
if (req.cachedFilters) {
const status = payload;
// TODO: Calculate searchableContent in Ruby on Rails:
const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
const now = new Date();
payload.filtered = [];
Object.values(req.cachedFilters).forEach((cachedFilter) => {
if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
const keyword_matches = searchIndex.match(cachedFilter.regexp);
if (keyword_matches) {
payload.filtered.push({
filter: cachedFilter.repr,
keyword_matches,
});
}
const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
// Check the filter hasn't expired before applying:
if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
return results;
}
});
}
transmit();
// Just in-case JSDOM fails to find textContent in searchableContent
if (!searchableTextContent) {
return results;
}
const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
if (keyword_matches) {
// results is an Array of FilterResult; status_matches is always
// null as we only are only applying the keyword-based custom
// filters, not the status-based custom filters.
// https://docs.joinmastodon.org/entities/FilterResult/
results.push({
filter: cachedFilter.filter,
keyword_matches,
status_matches: null
});
}
return results;
}, []);
// Send the payload + the FilterResults as the `filtered` property
// to the streaming connection. To reach this code, the `event` must
// have been either `update` or `status.update`, meaning the
// `payload` is a Status entity, which has a `filtered` property:
//
// filtered: https://docs.joinmastodon.org/entities/Status/#filtered
transmit(event, {
...payload,
filtered: filter_results
});
} else {
transmit(event, payload);
}
}).catch(err => {
releasePgConnection();
log.error(err);
done();
});
});
};
@ -775,7 +844,7 @@ const startWorker = async (workerId) => {
subscribe(`${redisPrefix}${id}`, listener);
});
if (attachCloseHandler) {
if (typeof attachCloseHandler === 'function') {
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
}
@ -812,12 +881,13 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {function(): void} [closeHandler]
* @return {function(string[]): void}
* @returns {function(string[], SubscriptionListener): void}
*/
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
req.on('close', () => {
ids.forEach(id => {
unsubscribe(id);
unsubscribe(id, listener);
});
if (closeHandler) {
@ -1077,7 +1147,7 @@ const startWorker = async (workerId) => {
* @typedef WebSocketSession
* @property {any} socket
* @property {any} request
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/
/**