Merge tag 'v3.4.0' into hometown-dev

This commit is contained in:
Darius Kazemi
2021-05-17 13:48:27 -07:00
897 changed files with 26918 additions and 14941 deletions

View File

@ -144,7 +144,7 @@ class ActivityPub::Activity
end
def delete_later!(uri)
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri)
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true)
end
def status_from_object
@ -210,12 +210,22 @@ class ActivityPub::Activity
end
end
def lock_or_return(key, expire_after = 7.days.seconds)
def lock_or_return(key, expire_after = 2.hours.seconds)
yield if redis.set(key, true, nx: true, ex: expire_after)
ensure
redis.del(key)
end
def lock_or_fail(key)
RedisLock.acquire({ redis: Redis.current, key: key }) do |lock|
if lock.acquired?
yield
else
raise Mastodon::RaceConditionError
end
end
end
def fetch?
!@options[:delivery]
end

View File

@ -4,29 +4,29 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
def perform
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
original_status = status_from_object
lock_or_fail("announce:#{@object['id']}") do
original_status = status_from_object
return reject_payload! if original_status.nil? || !announceable?(original_status)
return reject_payload! if original_status.nil? || !announceable?(original_status)
@status = Status.find_by(account: @account, reblog: original_status)
@status = Status.find_by(account: @account, reblog: original_status)
return @status unless @status.nil?
return @status unless @status.nil?
@status = Status.create!(
account: @account,
reblog: original_status,
uri: @json['id'],
created_at: @json['published'],
override_timestamps: @options[:override_timestamps],
visibility: visibility_from_audience
)
@status = Status.create!(
account: @account,
reblog: original_status,
uri: @json['id'],
created_at: @json['published'],
override_timestamps: @options[:override_timestamps],
visibility: visibility_from_audience
)
distribute(@status)
else
raise Mastodon::RaceConditionError
original_status.tags.each do |tag|
tag.use!(@account)
end
distribute(@status)
end
@status
@ -43,9 +43,9 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
end
def visibility_from_audience
if audience_to.include?(ActivityPub::TagManager::COLLECTIONS[:public])
if audience_to.any? { |to| ActivityPub::TagManager.instance.public_collection?(to) }
:public
elsif audience_cc.include?(ActivityPub::TagManager::COLLECTIONS[:public])
elsif audience_cc.any? { |cc| ActivityPub::TagManager.instance.public_collection?(cc) }
:unlisted
elsif audience_to.include?(@account.followers_url)
:private
@ -69,8 +69,4 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
def reblog_of_local_status?
status_from_uri(object_uri)&.account&.local?
end
def lock_options
{ redis: Redis.current, key: "announce:#{@object['id']}" }
end
end

View File

@ -11,8 +11,13 @@ class ActivityPub::Activity::Block < ActivityPub::Activity
return
end
UnfollowService.new.call(@account, target_account) if @account.following?(target_account)
UnfollowService.new.call(target_account, @account) if target_account.following?(@account)
RejectFollowService.new.call(target_account, @account) if target_account.requested?(@account)
@account.block!(target_account, uri: @json['id']) unless delete_arrived_first?(@json['id'])
unless delete_arrived_first?(@json['id'])
BlockWorker.perform_async(@account.id, target_account.id)
@account.block!(target_account, uri: @json['id'])
end
end
end

View File

@ -45,19 +45,15 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def create_status
return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
lock_or_fail("create:#{object_uri}") do
return if delete_arrived_first?(object_uri) || poll_vote?
@status = find_existing_status
@status = find_existing_status
if @status.nil?
process_status
elsif @options[:delivered_to_account_id].present?
postprocess_audience_and_deliver
end
else
raise Mastodon::RaceConditionError
if @status.nil?
process_status
elsif @options[:delivered_to_account_id].present?
postprocess_audience_and_deliver
end
end
@ -89,7 +85,6 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
resolve_thread(@status)
fetch_replies(@status)
check_for_spam
distribute(@status)
forward_for_reply
end
@ -175,7 +170,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def process_audience
(audience_to + audience_cc).uniq.each do |audience|
next if audience == ActivityPub::TagManager::COLLECTIONS[:public]
next if ActivityPub::TagManager.instance.public_collection?(audience)
# Unlike with tags, there is no point in resolving accounts we don't already
# know here, because silent mentions would only be used for local access
@ -221,7 +216,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def attach_tags(status)
@tags.each do |tag|
status.tags << tag
TrendingTags.record_use!(tag, status.account, status.created_at) if status.public_visibility?
tag.use!(@account, status: status, at_time: status.created_at) if status.public_visibility?
end
@mentions.each do |mention|
@ -366,13 +361,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
poll = replied_to_status.preloadable_poll
already_voted = true
RedisLock.acquire(poll_lock_options) do |lock|
if lock.acquired?
already_voted = poll.votes.where(account: @account).exists?
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
else
raise Mastodon::RaceConditionError
end
lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
already_voted = poll.votes.where(account: @account).exists?
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
end
increment_voters_count! unless already_voted
@ -408,9 +399,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
end
def visibility_from_audience
if audience_to.include?(ActivityPub::TagManager::COLLECTIONS[:public])
if audience_to.any? { |to| ActivityPub::TagManager.instance.public_collection?(to) }
:public
elsif audience_cc.include?(ActivityPub::TagManager::COLLECTIONS[:public])
elsif audience_cc.any? { |cc| ActivityPub::TagManager.instance.public_collection?(cc) }
:unlisted
elsif audience_to.include?(@account.followers_url)
:private
@ -546,10 +537,6 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
Tombstone.exists?(uri: object_uri)
end
def check_for_spam
SpamCheck.perform(@status)
end
def forward_for_reply
return unless @status.distributable? && @json['signature'].present? && reply_to_local?
@ -567,12 +554,4 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
poll.reload
retry
end
def lock_options
{ redis: Redis.current, key: "create:#{object_uri}" }
end
def poll_lock_options
{ redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" }
end
end

View File

@ -20,33 +20,35 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
def delete_note
return if object_uri.nil?
unless invalid_origin?(object_uri)
RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) }
Tombstone.find_or_create_by(uri: object_uri, account: @account)
lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
unless invalid_origin?(object_uri)
# This lock ensures a concurrent `ActivityPub::Activity::Create` either
# does not create a status at all, or has finished saving it to the
# database before we try to load it.
# Without the lock, `delete_later!` could be called after `delete_arrived_first?`
# and `Status.find` before `Status.create!`
lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }
Tombstone.find_or_create_by(uri: object_uri, account: @account)
end
@status = Status.find_by(uri: object_uri, account: @account)
@status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
return if @status.nil?
forward! if @json['signature'].present? && @status.distributable?
delete_now!
end
@status = Status.find_by(uri: object_uri, account: @account)
@status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
return if @status.nil?
if @status.distributable?
forward_for_reply
forward_for_reblogs
end
delete_now!
end
def forward_for_reblogs
return if @json['signature'].blank?
def rebloggers_ids
return @rebloggers_ids if defined?(@rebloggers_ids)
@rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
end
rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
inboxes = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url]
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, rebloggers_ids.first, inbox_url]
end
def inboxes_for_reblogs
Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes
end
def replied_to_status
@ -58,13 +60,19 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
!replied_to_status.nil? && replied_to_status.account.local?
end
def forward_for_reply
return unless @json['signature'].present? && reply_to_local?
def inboxes_for_reply
replied_to_status.account.followers.inboxes
end
inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url]
def forward!
inboxes = inboxes_for_reblogs
inboxes += inboxes_for_reply if reply_to_local?
inboxes -= [@account.preferred_inbox_url]
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, replied_to_status.account_id, inbox_url]
sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url|
[payload, sender_id, inbox_url]
end
end
@ -75,8 +83,4 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
def payload
@payload ||= Oj.dump(@json)
end
def lock_options
{ redis: Redis.current, key: "create:#{object_uri}" }
end
end

View File

@ -4,12 +4,14 @@ class ActivityPub::Activity::Flag < ActivityPub::Activity
def perform
return if skip_reports?
target_accounts = object_uris.map { |uri| account_from_uri(uri) }.compact.select(&:local?)
target_statuses_by_account = object_uris.map { |uri| status_from_uri(uri) }.compact.select(&:local?).group_by(&:account_id)
target_accounts = object_uris.filter_map { |uri| account_from_uri(uri) }.select(&:local?)
target_statuses_by_account = object_uris.filter_map { |uri| status_from_uri(uri) }.select(&:local?).group_by(&:account_id)
target_accounts.each do |target_account|
target_statuses = target_statuses_by_account[target_account.id]
next if target_account.suspended?
ReportService.new.call(
@account,
target_account,

View File

@ -6,7 +6,14 @@ class ActivityPub::Activity::Follow < ActivityPub::Activity
def perform
target_account = account_from_uri(object_uri)
return if target_account.nil? || !target_account.local? || delete_arrived_first?(@json['id']) || @account.requested?(target_account)
return if target_account.nil? || !target_account.local? || delete_arrived_first?(@json['id'])
# Update id of already-existing follow requests
existing_follow_request = ::FollowRequest.find_by(account: @account, target_account: target_account)
unless existing_follow_request.nil?
existing_follow_request.update!(uri: @json['id'])
return
end
if target_account.blocking?(@account) || target_account.domain_blocking?(@account.domain) || target_account.moved? || target_account.instance_actor?
reject_follow_request!(target_account)
@ -14,7 +21,9 @@ class ActivityPub::Activity::Follow < ActivityPub::Activity
end
# Fast-forward repeat follow requests
if @account.following?(target_account)
existing_follow = ::Follow.find_by(account: @account, target_account: target_account)
unless existing_follow.nil?
existing_follow.update!(uri: @json['id'])
AuthorizeFollowService.new.call(@account, target_account, skip_follow_request: true, follow_request_uri: @json['id'])
return
end

View File

@ -4,9 +4,8 @@ class ActivityPub::Activity::Move < ActivityPub::Activity
PROCESSING_COOLDOWN = 7.days.seconds
def perform
return if origin_account.uri != object_uri || processed?
mark_as_processing!
return if origin_account.uri != object_uri
return unless mark_as_processing!
target_account = ActivityPub::FetchRemoteAccountService.new.call(target_uri)
@ -35,12 +34,8 @@ class ActivityPub::Activity::Move < ActivityPub::Activity
value_or_id(@json['target'])
end
def processed?
redis.exists?("move_in_progress:#{@account.id}")
end
def mark_as_processing!
redis.setex("move_in_progress:#{@account.id}", PROCESSING_COOLDOWN, true)
redis.set("move_in_progress:#{@account.id}", true, nx: true, ex: PROCESSING_COOLDOWN)
end
def unmark_as_processing!

View File

@ -12,6 +12,10 @@ class ActivityPub::TagManager
public: 'https://www.w3.org/ns/activitystreams#Public',
}.freeze
def public_collection?(uri)
uri == COLLECTIONS[:public] || uri == 'as:Public' || uri == 'Public'
end
def url_for(target)
return target.url if target.respond_to?(:local?) && !target.local?