Fix posts from threads received out-of-order sometimes not being inserted into timelines (#27653)
This commit is contained in:
parent
81d7cfd544
commit
54a07731d1
3 changed files with 116 additions and 4 deletions
|
@ -8,6 +8,7 @@ class FanOutOnWriteService < BaseService
|
||||||
# @param [Hash] options
|
# @param [Hash] options
|
||||||
# @option options [Boolean] update
|
# @option options [Boolean] update
|
||||||
# @option options [Array<Integer>] silenced_account_ids
|
# @option options [Array<Integer>] silenced_account_ids
|
||||||
|
# @option options [Boolean] skip_notifications
|
||||||
def call(status, options = {})
|
def call(status, options = {})
|
||||||
@status = status
|
@status = status
|
||||||
@account = status.account
|
@account = status.account
|
||||||
|
@ -37,8 +38,11 @@ class FanOutOnWriteService < BaseService
|
||||||
|
|
||||||
def fan_out_to_local_recipients!
|
def fan_out_to_local_recipients!
|
||||||
deliver_to_self!
|
deliver_to_self!
|
||||||
notify_mentioned_accounts!
|
|
||||||
notify_about_update! if update?
|
unless @options[:skip_notifications]
|
||||||
|
notify_mentioned_accounts!
|
||||||
|
notify_about_update! if update?
|
||||||
|
end
|
||||||
|
|
||||||
case @status.visibility.to_sym
|
case @status.visibility.to_sym
|
||||||
when :public, :unlisted, :private
|
when :public, :unlisted, :private
|
||||||
|
|
|
@ -7,13 +7,18 @@ class ThreadResolveWorker
|
||||||
sidekiq_options queue: 'pull', retry: 3
|
sidekiq_options queue: 'pull', retry: 3
|
||||||
|
|
||||||
def perform(child_status_id, parent_url, options = {})
|
def perform(child_status_id, parent_url, options = {})
|
||||||
child_status = Status.find(child_status_id)
|
child_status = Status.find(child_status_id)
|
||||||
parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
|
return if child_status.in_reply_to_id.present?
|
||||||
|
|
||||||
|
parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
|
||||||
|
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
|
||||||
|
|
||||||
return if parent_status.nil?
|
return if parent_status.nil?
|
||||||
|
|
||||||
child_status.thread = parent_status
|
child_status.thread = parent_status
|
||||||
child_status.save!
|
child_status.save!
|
||||||
|
|
||||||
|
DistributionWorker.perform_async(child_status_id, { 'skip_notifications' => true }) if child_status.within_realtime_window?
|
||||||
rescue ActiveRecord::RecordNotFound
|
rescue ActiveRecord::RecordNotFound
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
|
@ -23,6 +23,109 @@ RSpec.describe ActivityPub::Activity::Create do
|
||||||
stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' })
|
stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe 'processing posts received out of order' do
|
||||||
|
let(:follower) { Fabricate(:account, username: 'bob') }
|
||||||
|
|
||||||
|
let(:object_json) do
|
||||||
|
{
|
||||||
|
id: [ActivityPub::TagManager.instance.uri_for(sender), 'post1'].join('/'),
|
||||||
|
type: 'Note',
|
||||||
|
to: [
|
||||||
|
'https://www.w3.org/ns/activitystreams#Public',
|
||||||
|
ActivityPub::TagManager.instance.uri_for(follower),
|
||||||
|
],
|
||||||
|
content: '@bob lorem ipsum',
|
||||||
|
published: 1.hour.ago.utc.iso8601,
|
||||||
|
updated: 1.hour.ago.utc.iso8601,
|
||||||
|
tag: {
|
||||||
|
type: 'Mention',
|
||||||
|
href: ActivityPub::TagManager.instance.uri_for(follower),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:reply_json) do
|
||||||
|
{
|
||||||
|
id: [ActivityPub::TagManager.instance.uri_for(sender), 'reply'].join('/'),
|
||||||
|
type: 'Note',
|
||||||
|
inReplyTo: object_json[:id],
|
||||||
|
to: [
|
||||||
|
'https://www.w3.org/ns/activitystreams#Public',
|
||||||
|
ActivityPub::TagManager.instance.uri_for(follower),
|
||||||
|
],
|
||||||
|
content: '@bob lorem ipsum',
|
||||||
|
published: Time.now.utc.iso8601,
|
||||||
|
updated: Time.now.utc.iso8601,
|
||||||
|
tag: {
|
||||||
|
type: 'Mention',
|
||||||
|
href: ActivityPub::TagManager.instance.uri_for(follower),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
def activity_for_object(json)
|
||||||
|
{
|
||||||
|
'@context': 'https://www.w3.org/ns/activitystreams',
|
||||||
|
id: [json[:id], 'activity'].join('/'),
|
||||||
|
type: 'Create',
|
||||||
|
actor: ActivityPub::TagManager.instance.uri_for(sender),
|
||||||
|
object: json,
|
||||||
|
}.with_indifferent_access
|
||||||
|
end
|
||||||
|
|
||||||
|
before do
|
||||||
|
follower.follow!(sender)
|
||||||
|
end
|
||||||
|
|
||||||
|
around do |example|
|
||||||
|
Sidekiq::Testing.fake! do
|
||||||
|
example.run
|
||||||
|
Sidekiq::Worker.clear_all
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'correctly processes posts and inserts them in timelines', :aggregate_failures do
|
||||||
|
# Simulate a temporary failure preventing from fetching the parent post
|
||||||
|
stub_request(:get, object_json[:id]).to_return(status: 500)
|
||||||
|
|
||||||
|
# When receiving the reply…
|
||||||
|
described_class.new(activity_for_object(reply_json), sender, delivery: true).perform
|
||||||
|
|
||||||
|
# NOTE: Refering explicitly to the workers is a bit awkward
|
||||||
|
DistributionWorker.drain
|
||||||
|
FeedInsertWorker.drain
|
||||||
|
|
||||||
|
# …it creates a status with an unknown parent
|
||||||
|
reply = Status.find_by(uri: reply_json[:id])
|
||||||
|
expect(reply.reply?).to be true
|
||||||
|
expect(reply.in_reply_to_id).to be_nil
|
||||||
|
|
||||||
|
# …and creates a notification
|
||||||
|
expect(LocalNotificationWorker.jobs.size).to eq 1
|
||||||
|
|
||||||
|
# …but does not insert it into timelines
|
||||||
|
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_nil
|
||||||
|
|
||||||
|
# When receiving the parent…
|
||||||
|
described_class.new(activity_for_object(object_json), sender, delivery: true).perform
|
||||||
|
|
||||||
|
Sidekiq::Worker.drain_all
|
||||||
|
|
||||||
|
# …it creates a status and insert it into timelines
|
||||||
|
parent = Status.find_by(uri: object_json[:id])
|
||||||
|
expect(parent.reply?).to be false
|
||||||
|
expect(parent.in_reply_to_id).to be_nil
|
||||||
|
expect(reply.reload.in_reply_to_id).to eq parent.id
|
||||||
|
|
||||||
|
# Check that the both statuses have been inserted into the home feed
|
||||||
|
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), parent.id)).to be_within(0.1).of(parent.id.to_f)
|
||||||
|
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_within(0.1).of(reply.id.to_f)
|
||||||
|
|
||||||
|
# Creates two notifications
|
||||||
|
expect(Notification.count).to eq 2
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe '#perform' do
|
describe '#perform' do
|
||||||
context 'when fetching' do
|
context 'when fetching' do
|
||||||
subject { described_class.new(json, sender) }
|
subject { described_class.new(json, sender) }
|
||||||
|
|
Loading…
Reference in a new issue