Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream 20231103 #226

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions app/services/fan_out_on_write_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class FanOutOnWriteService < BaseService
# @param [Hash] options
# @option options [Boolean] update
# @option options [Array<Integer>] silenced_account_ids
# @option options [Boolean] skip_notifications
def call(status, options = {})
@status = status
@account = status.account
Expand Down Expand Up @@ -42,8 +43,11 @@ def check_race_condition!

def fan_out_to_local_recipients!
deliver_to_self!
notify_mentioned_accounts!
notify_about_update! if update?

unless @options[:skip_notifications]
notify_mentioned_accounts!
notify_about_update! if update?
end

case @status.visibility.to_sym
when :public, :unlisted, :public_unlisted, :login, :private
Expand Down
9 changes: 7 additions & 2 deletions app/workers/thread_resolve_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ class ThreadResolveWorker
sidekiq_options queue: 'pull', retry: 3

def perform(child_status_id, parent_url, options = {})
child_status = Status.find(child_status_id)
parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
child_status = Status.find(child_status_id)
return if child_status.in_reply_to_id.present?

parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)

return if parent_status.nil?

child_status.thread = parent_status
child_status.save!

DistributionWorker.perform_async(child_status_id, { 'skip_notifications' => true }) if child_status.within_realtime_window?
rescue ActiveRecord::RecordNotFound
true
end
Expand Down
103 changes: 103 additions & 0 deletions spec/lib/activitypub/activity/create_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,109 @@
stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' })
end

describe 'processing posts received out of order' do
let(:follower) { Fabricate(:account, username: 'bob') }

let(:object_json) do
{
id: [ActivityPub::TagManager.instance.uri_for(sender), 'post1'].join('/'),
type: 'Note',
to: [
'https://www.w3.org/ns/activitystreams#Public',
ActivityPub::TagManager.instance.uri_for(follower),
],
content: '@bob lorem ipsum',
published: 1.hour.ago.utc.iso8601,
updated: 1.hour.ago.utc.iso8601,
tag: {
type: 'Mention',
href: ActivityPub::TagManager.instance.uri_for(follower),
},
}
end

let(:reply_json) do
{
id: [ActivityPub::TagManager.instance.uri_for(sender), 'reply'].join('/'),
type: 'Note',
inReplyTo: object_json[:id],
to: [
'https://www.w3.org/ns/activitystreams#Public',
ActivityPub::TagManager.instance.uri_for(follower),
],
content: '@bob lorem ipsum',
published: Time.now.utc.iso8601,
updated: Time.now.utc.iso8601,
tag: {
type: 'Mention',
href: ActivityPub::TagManager.instance.uri_for(follower),
},
}
end

def activity_for_object(json)
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: [json[:id], 'activity'].join('/'),
type: 'Create',
actor: ActivityPub::TagManager.instance.uri_for(sender),
object: json,
}.with_indifferent_access
end

before do
follower.follow!(sender)
end

around do |example|
Sidekiq::Testing.fake! do
example.run
Sidekiq::Worker.clear_all
end
end

it 'correctly processes posts and inserts them in timelines', :aggregate_failures do
# Simulate a temporary failure preventing from fetching the parent post
stub_request(:get, object_json[:id]).to_return(status: 500)

# When receiving the reply…
described_class.new(activity_for_object(reply_json), sender, delivery: true).perform

# NOTE: Refering explicitly to the workers is a bit awkward
DistributionWorker.drain
FeedInsertWorker.drain

# …it creates a status with an unknown parent
reply = Status.find_by(uri: reply_json[:id])
expect(reply.reply?).to be true
expect(reply.in_reply_to_id).to be_nil

# …and creates a notification
expect(LocalNotificationWorker.jobs.size).to eq 1

# …but does not insert it into timelines
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_nil

# When receiving the parent…
described_class.new(activity_for_object(object_json), sender, delivery: true).perform

Sidekiq::Worker.drain_all

# …it creates a status and insert it into timelines
parent = Status.find_by(uri: object_json[:id])
expect(parent.reply?).to be false
expect(parent.in_reply_to_id).to be_nil
expect(reply.reload.in_reply_to_id).to eq parent.id

# Check that the both statuses have been inserted into the home feed
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), parent.id)).to be_within(0.1).of(parent.id.to_f)
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_within(0.1).of(reply.id.to_f)

# Creates two notifications
expect(Notification.count).to eq 2
end
end

describe '#perform' do
context 'when fetching' do
subject { described_class.new(json, sender) }
Expand Down
Loading