diff --git a/app/controllers/activitypub/inboxes_controller.rb b/app/controllers/activitypub/inboxes_controller.rb index 92dcb5ac77..a7592c4725 100644 --- a/app/controllers/activitypub/inboxes_controller.rb +++ b/app/controllers/activitypub/inboxes_controller.rb @@ -5,21 +5,24 @@ class ActivityPub::InboxesController < ActivityPub::BaseController include JsonLdHelper include AccountOwnedConcern - before_action :skip_unknown_actor_activity + before_action :skip_unknown_actor_activity! before_action :require_signature! skip_before_action :authenticate_user! + ACCEPTED_HEADERS = %w( + Collection-Synchronization + ).freeze + def create upgrade_account - process_collection_synchronization process_payload - head 202 + head(:accepted) end private - def skip_unknown_actor_activity - head 202 if unknown_affected_account? + def skip_unknown_actor_activity! + head(:accepted) if unknown_affected_account? end def unknown_affected_account? @@ -57,20 +60,16 @@ class ActivityPub::InboxesController < ActivityPub::BaseController DeliveryFailureTracker.reset!(signed_request_account.inbox_url) end - def process_collection_synchronization - raw_params = request.headers['Collection-Synchronization'] - return if raw_params.blank? || ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] == 'true' - - # Re-using the syntax for signature parameters - tree = SignatureParamsParser.new.parse(raw_params) - params = SignatureParamsTransformer.new.apply(tree) - - ActivityPub::PrepareFollowersSynchronizationService.new.call(signed_request_account, params) - rescue Parslet::ParseFailed - Rails.logger.warn 'Error parsing Collection-Synchronization header' + def process_payload + ActivityPub::ProcessingWorker.perform_async( + signed_request_account.id, + body, + @account&.id, + accepted_headers_from_request + ) end - def process_payload - ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body, @account&.id) + def accepted_headers_from_request + ACCEPTED_HEADERS.map { |header_name| [header_name, request.headers[header_name]] if request.headers[header_name].present? }.compact.to_h end end diff --git a/app/lib/account_reach_finder.rb b/app/lib/account_reach_finder.rb index 706ce8c1fb..571fca974c 100644 --- a/app/lib/account_reach_finder.rb +++ b/app/lib/account_reach_finder.rb @@ -1,12 +1,13 @@ # frozen_string_literal: true class AccountReachFinder - def initialize(account) + def initialize(account, options = {}) @account = account + @options = options end def inboxes - (followers_inboxes + reporters_inboxes + relay_inboxes).uniq + (followers_inboxes + reporters_inboxes + blocked_by_inboxes + relay_inboxes).uniq end private @@ -19,6 +20,14 @@ class AccountReachFinder Account.where(id: @account.targeted_reports.select(:account_id)).inboxes end + def blocked_by_inboxes + if @options[:with_blocking_accounts] + @account.blocked_by.inboxes + else + [] + end + end + def relay_inboxes Relay.enabled.pluck(:inbox_url) end diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 4c13a80a67..2098cf978c 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -490,7 +490,16 @@ class ActivityPub::Activity::Create < ActivityPub::Activity def forward_for_reply return unless @status.distributable? && @json['signature'].present? && reply_to_local? - ActivityPub::RawDistributionWorker.perform_async(Oj.dump(@json), replied_to_status.account_id, [@account.preferred_inbox_url]) + # If the status is replying to a local status, we should forward it + # everywhere where that local status was distributed + + status_reach_finder = StatusReachFinder.new(@status) + sender_id = replied_to_status.account_id + exclude_inboxes = [@options[:relayed_through_account], @account].compact.map(&:preferred_inbox_url) + + ActivityPub::LowPriorityDeliveryWorker.push_bulk(status_reach_finder.inboxes - exclude_inboxes) do |inbox_url| + [payload, sender_id, inbox_url] + end end def increment_voters_count! diff --git a/app/lib/activitypub/activity/delete.rb b/app/lib/activitypub/activity/delete.rb index 801647cf74..ae7e2068de 100644 --- a/app/lib/activitypub/activity/delete.rb +++ b/app/lib/activitypub/activity/delete.rb @@ -37,22 +37,14 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity return if @status.nil? - forward! if @json['signature'].present? && @status.distributable? + forward! delete_now! end end - def rebloggers_ids - return @rebloggers_ids if defined?(@rebloggers_ids) - @rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id) - end - - def inboxes_for_reblogs - Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - end - def replied_to_status return @replied_to_status if defined?(@replied_to_status) + @replied_to_status = @status.thread end @@ -60,18 +52,19 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity !replied_to_status.nil? && replied_to_status.account.local? end - def inboxes_for_reply - replied_to_status.account.followers.inboxes - end - def forward! - inboxes = inboxes_for_reblogs - inboxes += inboxes_for_reply if reply_to_local? - inboxes -= [@account.preferred_inbox_url] + return unless @json['signature'].present? && @status.distributable? - sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first + # A remote status is being deleted. We could have local users that + # have reblogged it, who have remote followers. And if it's a reply + # to a local status, the parent would have forwarded it to + # lots of places - ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url| + status_reach_finder = StatusReachFinder.new(@status) + sender_id = Account.representative.id + exclude_inboxes = [@options[:relayed_through_account], @account].compact.map(&:preferred_inbox_url) + + ActivityPub::LowPriorityDeliveryWorker.push_bulk(status_reach_finder.inboxes - exclude_inboxes) do |inbox_url| [payload, sender_id, inbox_url] end end diff --git a/app/lib/status_reach_finder.rb b/app/lib/status_reach_finder.rb index 735d66a4f7..ded71f1154 100644 --- a/app/lib/status_reach_finder.rb +++ b/app/lib/status_reach_finder.rb @@ -1,10 +1,19 @@ # frozen_string_literal: true class StatusReachFinder - def initialize(status) - @status = status + DEFAULT_OPTIONS = { + with_parent: true, + }.freeze + + # @param [Status] status + # @param [Hash] options + # @option [Boolean] with_parent + def initialize(status, options = {}) + @status = status + @options = DEFAULT_OPTIONS.merge(options) end + # @return [Array] def inboxes (reached_account_inboxes + followers_inboxes + relay_inboxes).uniq end @@ -15,11 +24,15 @@ class StatusReachFinder # When the status is a reblog, there are no interactions with it # directly, we assume all interactions are with the original one - if @status.reblog? - [] - else - Account.where(id: reached_account_ids).inboxes - end + return [] if @status.reblog? + + # If a status is a reply to a local status, we also want to send + # it everywhere the parent status was sent + + arr = [] + arr.concat(self.class.new(@status.thread, with_parent: false).inboxes) if @status.in_reply_to_local_account? && @options[:with_parent] + arr.concat(Account.where(id: reached_account_ids).inboxes) + arr end def reached_account_ids @@ -28,6 +41,7 @@ class StatusReachFinder reblog_of_account_id, mentioned_account_ids, reblogs_account_ids, + reblogger_follower_account_ids, favourites_account_ids, replies_account_ids, ].tap do |arr| @@ -38,7 +52,7 @@ class StatusReachFinder end def replied_to_account_id - @status.in_reply_to_account_id + @status.in_reply_to_account_id if @status.local? end def reblog_of_account_id @@ -46,27 +60,27 @@ class StatusReachFinder end def mentioned_account_ids - @status.mentions.pluck(:account_id) + @status.mentions.pluck(:account_id) if @status.local? end def reblogs_account_ids - @status.reblogs.pluck(:account_id) + @reblogs_account_ids ||= @status.reblogs.pluck(:account_id) + end + + def reblogger_follower_account_ids + Follow.where(target_account_id: reblogs_account_ids).pluck(:account_id) end def favourites_account_ids - @status.favourites.pluck(:account_id) + @status.favourites.pluck(:account_id) if @status.local? end def replies_account_ids - @status.replies.pluck(:account_id) + @status.replies.pluck(:account_id) if @status.local? end def followers_inboxes - if @status.in_reply_to_local_account? && @status.distributable? - @status.account.followers.or(@status.thread.account.followers).inboxes - else - @status.account.followers.inboxes - end + @status.account.followers.inboxes end def relay_inboxes diff --git a/app/services/activitypub/prepare_followers_synchronization_service.rb b/app/services/activitypub/prepare_followers_synchronization_service.rb deleted file mode 100644 index 2d22ed701e..0000000000 --- a/app/services/activitypub/prepare_followers_synchronization_service.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -class ActivityPub::PrepareFollowersSynchronizationService < BaseService - include JsonLdHelper - - def call(account, params) - @account = account - - return if params['collectionId'] != @account.followers_url || invalid_origin?(params['url']) || @account.local_followers_hash == params['digest'] - - ActivityPub::FollowersSynchronizationWorker.perform_async(@account.id, params['url']) - end -end diff --git a/app/services/activitypub/process_collection_synchronization_service.rb b/app/services/activitypub/process_collection_synchronization_service.rb new file mode 100644 index 0000000000..ab15a6a85f --- /dev/null +++ b/app/services/activitypub/process_collection_synchronization_service.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +class ActivityPub::ProcessCollectionSynchronizationService < BaseService + include JsonLdHelper + + def call(account, value) + return unless collection_synchronization_enabled? + + @account = account + @params = parse_value(value) + + return if unknown_collection? || collection_up_to_date? + + ActivityPub::FollowersSynchronizationWorker.perform_async(@account.id, @params['url']) + rescue Parslet::ParseFailed + Rails.logger.warn 'Error parsing Collection-Synchronization header' + end + + private + + def parse_value(value) + SignatureVerification::SignatureParamsTransformer.new.apply(SignatureVerification::SignatureParamsParser.new.parse(value)) + end + + def unknown_collection? + @params['collectionId'] != @account.followers_url || invalid_origin?(@params['url']) + end + + def collection_up_to_date? + @account.local_followers_hash == @params['digest'] + end + + def collection_synchronization_enabled? + ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' + end +end diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 788f2cf809..13e787ddf0 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -39,7 +39,7 @@ class ActivityPub::DeliveryWorker Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request| request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with]) request.add_headers(HEADERS) - request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' && @options[:synchronize_followers] + request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if synchronize_followers? end end @@ -47,6 +47,14 @@ class ActivityPub::DeliveryWorker "collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(@inbox_url)}\", url=\"#{account_followers_synchronization_url(@source_account)}\"" end + def synchronize_followers? + followers_synchronization_enabled? && @options[:synchronize_followers] + end + + def followers_synchronization_enabled? + ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' + end + def perform_request light = Stoplight(@inbox_url) do request_pool.with(@host) do |http_client| diff --git a/app/workers/activitypub/followers_synchronization_worker.rb b/app/workers/activitypub/followers_synchronization_worker.rb index 35a3ef0b96..0d1dd5cc79 100644 --- a/app/workers/activitypub/followers_synchronization_worker.rb +++ b/app/workers/activitypub/followers_synchronization_worker.rb @@ -6,9 +6,8 @@ class ActivityPub::FollowersSynchronizationWorker sidekiq_options queue: 'push', lock: :until_executed def perform(account_id, url) - @account = Account.find_by(id: account_id) - return true if @account.nil? - - ActivityPub::SynchronizeFollowersService.new.call(@account, url) + ActivityPub::SynchronizeFollowersService.new.call(Account.find(account_id), url) + rescue ActiveRecord::RecordNotFound + true end end diff --git a/app/workers/activitypub/move_distribution_worker.rb b/app/workers/activitypub/move_distribution_worker.rb index 65c5c0d1c3..766bc8d41c 100644 --- a/app/workers/activitypub/move_distribution_worker.rb +++ b/app/workers/activitypub/move_distribution_worker.rb @@ -1,33 +1,25 @@ # frozen_string_literal: true -class ActivityPub::MoveDistributionWorker - include Sidekiq::Worker - include Payloadable - - sidekiq_options queue: 'push' - +class ActivityPub::MoveDistributionWorker < ActivityPub::UpdateDistributionWorker + # Distribute a move activity to all servers that might have a copy of the + # account in question, including places that blocked the account, so that + # they have a chance to re-block the new account too def perform(migration_id) @migration = AccountMigration.find(migration_id) @account = @migration.account - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [signed_payload, @account.id, inbox_url] - end - - ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| - [signed_payload, @account.id, inbox_url] - end + distribute! rescue ActiveRecord::RecordNotFound true end - private + protected def inboxes - @inboxes ||= (@migration.account.followers.inboxes + @migration.account.blocked_by.inboxes).uniq + @inboxes ||= AccountReachFinder.new(@account, with_blocking_accounts: true).inboxes end - def signed_payload - @signed_payload ||= Oj.dump(serialize_payload(@migration, ActivityPub::MoveSerializer, signer: @account)) + def payload + @payload ||= Oj.dump(serialize_payload(@migration, ActivityPub::MoveSerializer, signer: @account)) end end diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb index cef5953194..94794e5bc2 100644 --- a/app/workers/activitypub/processing_worker.rb +++ b/app/workers/activitypub/processing_worker.rb @@ -5,9 +5,36 @@ class ActivityPub::ProcessingWorker sidekiq_options backtrace: true, retry: 8 - def perform(account_id, body, delivered_to_account_id = nil) - ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) + def perform(account_id, body, delivered_to_account_id = nil, headers = {}) + @account = Account.find(account_id) + @body = body + @delivered_to_account_id = delivered_to_account_id + @headers = headers + + process_collection_synchronization! + process_collection! rescue ActiveRecord::RecordInvalid => e - Rails.logger.debug "Error processing incoming ActivityPub object: #{e}" + Rails.logger.debug "Error processing incoming ActivityPub payload: #{e}" + end + + private + + def process_collection_synchronization! + collection_synchronization = @headers['Collection-Synchronization'] + + return if collection_synchronization.blank? + + ActivityPub::ProcessCollectionSynchronizationService.new.call(@account, collection_synchronization) + end + + def process_collection! + ActivityPub::ProcessCollectionService.new.call( + @body, + @account, + override_timestamps: true, + delivered_to_account_id: @delivered_to_account_id, + delivery: true, + headers: @headers + ) end end diff --git a/spec/lib/status_reach_finder_spec.rb b/spec/lib/status_reach_finder_spec.rb new file mode 100644 index 0000000000..a6794cb17c --- /dev/null +++ b/spec/lib/status_reach_finder_spec.rb @@ -0,0 +1,145 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe StatusReachFinder do + describe '#inboxes' do + context 'for a local status' do + let(:parent_status) { nil } + let(:alice) { Fabricate(:account, username: 'alice') } + let(:status) { Fabricate(:status, account: alice, thread: parent_status) } + + subject { described_class.new(status) } + + context 'when it contains mentions of remote accounts' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + + before do + status.mentions.create!(account: bob) + end + + it 'includes the inbox of the reblogger' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + end + + context 'when it has been reblogged by a remote account' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + + before do + bob.statuses.create!(reblog: status) + end + + it 'includes the inbox of the reblogger' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + end + + context 'when it has been favourited by a remote account' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + + before do + bob.favourites.create!(status: status) + end + + it 'includes the inbox of the favouriter' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + end + + context 'when it has been replied to by a remote account' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + + before do + bob.statuses.create!(thread: status, text: 'Hoge') + end + + it 'includes the inbox of the replier' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + end + + context 'when it is a reply to a remote account' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + let(:parent_status) { Fabricate(:status, account: bob) } + + it 'includes the inbox of the replied-to account' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + end + end + + context 'for a remote status' do + let(:parent_status) { nil } + let(:alice) { Fabricate(:account, username: 'alice', domain: 'example.com') } + let(:status) { Fabricate(:status, account: alice, thread: parent_status) } + + subject { described_class.new(status) } + + context 'when it is a reply to a local status' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + let(:tom) { Fabricate(:account, username: 'tom', domain: 'bar.baz', protocol: :activitypub, inbox_url: 'https://bar.baz/inbox') } + let(:dan) { Fabricate(:account, username: 'dan', domain: 'baz.foo', protocol: :activitypub, inbox_url: 'https://baz.foo/inbox') } + + let(:parent_status) { Fabricate(:status) } + + before do + bob.follow!(parent_status.account) + tom.statuses.create!(reblog: parent_status) + parent_status.mentions.create!(account: dan) + end + + it 'includes inboxes of replied-to account\'s followers' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + + it 'includes inboxes of accounts that reblogged the replied-to status' do + expect(subject.inboxes).to include 'https://bar.baz/inbox' + end + + it 'includes inboxes of accounts mentioned in the replied-to status' do + expect(subject.inboxes).to include 'https://baz.foo/inbox' + end + end + + context 'when it has been reblogged by a local account' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + let(:tom) { Fabricate(:account, username: 'tom') } + + before do + bob.follow!(tom) + tom.statuses.create!(reblog: status) + end + + it 'includes inboxes of remote followers of the rebloggers' do + expect(subject.inboxes).to include 'https://foo.bar/inbox' + end + end + + context 'when it is a reply to a remote status' do + let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') } + let(:tom) { Fabricate(:account, username: 'tom', domain: 'bar.baz', protocol: :activitypub, inbox_url: 'https://bar.baz/inbox') } + let(:dan) { Fabricate(:account, username: 'dan', domain: 'baz.foo', protocol: :activitypub, inbox_url: 'https://baz.foo/inbox') } + + let(:parent_status) { Fabricate(:status, account: bob) } + + before do + parent_status.mentions.create!(account: dan) + status.mentions.create!(account: tom) + end + + it 'does not include inbox of replied-to account' do + expect(subject.inboxes).to_not include 'https://foo.bar/inbox' + end + + it 'does not include inboxes of accounts mentioned in the status' do + expect(subject.inboxes).to_not include 'https://bar.baz/inbox' + end + + it 'does not include inboxes of accounts mentioned in the replied-to status' do + expect(subject.inboxes).to_not include 'https://baz.foo/inbox' + end + end + end + end +end diff --git a/spec/services/remove_status_service_spec.rb b/spec/services/remove_status_service_spec.rb index 21fb0cd353..1418ae9b8b 100644 --- a/spec/services/remove_status_service_spec.rb +++ b/spec/services/remove_status_service_spec.rb @@ -1,7 +1,7 @@ require 'rails_helper' RSpec.describe RemoveStatusService, type: :service do - subject { RemoveStatusService.new } + subject { described_class.new } let!(:alice) { Fabricate(:account, user: Fabricate(:user)) } let!(:bob) { Fabricate(:account, username: 'bob', domain: 'example.com') } @@ -21,14 +21,18 @@ RSpec.describe RemoveStatusService, type: :service do Fabricate(:status, account: bill, reblog: @status, uri: 'hoge') end + def home_feed_of(account) + HomeFeed.new(account).get(10).map(&:id) + end + it 'removes status from author\'s home feed' do subject.call(@status) - expect(HomeFeed.new(alice).get(10)).to_not include(@status.id) + expect(home_feed_of(alice)).to_not include(@status.id) end it 'removes status from local follower\'s home feed' do subject.call(@status) - expect(HomeFeed.new(jeff).get(10)).to_not include(@status.id) + expect(home_feed_of(jeff)).to_not include(@status.id) end it 'sends delete activity to followers' do @@ -38,7 +42,7 @@ RSpec.describe RemoveStatusService, type: :service do it 'sends delete activity to rebloggers' do subject.call(@status) - expect(a_request(:post, 'http://example2.com/inbox')).to have_been_made + expect(a_request(:post, 'http://example2.com/inbox')).to have_been_made.once end it 'remove status from notifications' do