Fix subscriptions:clear task, refactor feeds, refactor streamable activites
and atom feed generation to some extent, as well as the way mentions are stored
This commit is contained in:
@ -1,6 +1,4 @@
|
||||
class FanOutOnWriteService < BaseService
|
||||
MAX_FEED_SIZE = 800
|
||||
|
||||
# Push a status into home and mentions feeds
|
||||
# @param [Status] status
|
||||
def call(status)
|
||||
@ -17,13 +15,13 @@ class FanOutOnWriteService < BaseService
|
||||
|
||||
def deliver_to_followers(status, replied_to_user)
|
||||
status.account.followers.each do |follower|
|
||||
next if (status.reply? && !(follower.id = replied_to_user.id || follower.following?(replied_to_user))) || !follower.local?
|
||||
next if !follower.local? || FeedManager.filter_status?(status, follower)
|
||||
push(:home, follower.id, status)
|
||||
end
|
||||
end
|
||||
|
||||
def deliver_to_mentioned(status)
|
||||
status.mentioned_accounts.each do |mention|
|
||||
status.mentions.each do |mention|
|
||||
mentioned_account = mention.account
|
||||
next unless mentioned_account.local?
|
||||
push(:mentions, mentioned_account.id, status)
|
||||
@ -31,19 +29,15 @@ class FanOutOnWriteService < BaseService
|
||||
end
|
||||
|
||||
def push(type, receiver_id, status)
|
||||
redis.zadd(key(type, receiver_id), status.id, status.id)
|
||||
redis.zadd(FeedManager.key(type, receiver_id), status.id, status.id)
|
||||
trim(type, receiver_id)
|
||||
end
|
||||
|
||||
def trim(type, receiver_id)
|
||||
return unless redis.zcard(key(type, receiver_id)) > MAX_FEED_SIZE
|
||||
return unless redis.zcard(FeedManager.key(type, receiver_id)) > FeedManager::MAX_ITEMS
|
||||
|
||||
last = redis.zrevrange(key(type, receiver_id), MAX_FEED_SIZE - 1, MAX_FEED_SIZE - 1)
|
||||
redis.zremrangebyscore(key(type, receiver_id), '-inf', "(#{last.last}")
|
||||
end
|
||||
|
||||
def key(type, id)
|
||||
"feed:#{type}:#{id}"
|
||||
last = redis.zrevrange(FeedManager.key(type, receiver_id), FeedManager::MAX_ITEMS - 1, FeedManager::MAX_ITEMS - 1)
|
||||
redis.zremrangebyscore(FeedManager.key(type, receiver_id), '-inf', "(#{last.last}")
|
||||
end
|
||||
|
||||
def redis
|
||||
|
@ -1,34 +1,22 @@
|
||||
class PrecomputeFeedService < BaseService
|
||||
MAX_FEED_SIZE = 800
|
||||
|
||||
# Fill up a user's home/mentions feed from DB and return it
|
||||
# Fill up a user's home/mentions feed from DB and return a subset
|
||||
# @param [Symbol] type :home or :mentions
|
||||
# @param [Account] account
|
||||
# @return [Array]
|
||||
def call(type, account)
|
||||
statuses = send(type.to_s, account).order('created_at desc').limit(MAX_FEED_SIZE)
|
||||
statuses.each { |status| push(type, account.id, status) }
|
||||
statuses
|
||||
def call(type, account, limit)
|
||||
instant_return = []
|
||||
|
||||
Status.send("as_#{type}_timeline", account).order('created_at desc').limit(FeedManager::MAX_ITEMS).each do |status|
|
||||
next if type == :home && FeedManager.filter_status?(status, account)
|
||||
redis.zadd(FeedManager.key(type, receiver_id), status.id, status.id)
|
||||
instant_return << status unless instant_return.size > limit
|
||||
end
|
||||
|
||||
instant_return
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def push(type, receiver_id, status)
|
||||
redis.zadd(key(type, receiver_id), status.id, status.id)
|
||||
end
|
||||
|
||||
def home(account)
|
||||
Status.where(account: [account] + account.following).with_includes.with_counters
|
||||
end
|
||||
|
||||
def mentions(account)
|
||||
Status.where(id: Mention.where(account: account).pluck(:status_id)).with_includes.with_counters
|
||||
end
|
||||
|
||||
def key(type, id)
|
||||
"feed:#{type}:#{id}"
|
||||
end
|
||||
|
||||
def redis
|
||||
$redis
|
||||
end
|
||||
|
@ -4,65 +4,67 @@ class ProcessFeedService < BaseService
|
||||
# @param [Account] account Account this feed belongs to
|
||||
def call(body, account)
|
||||
xml = Nokogiri::XML(body)
|
||||
|
||||
# If we got a full feed, make sure the account's profile is up to date
|
||||
unless xml.at_xpath('/xmlns:feed').nil?
|
||||
update_remote_profile_service.(xml.at_xpath('/xmlns:feed/xmlns:author'), account)
|
||||
end
|
||||
|
||||
# Process entries
|
||||
xml.xpath('//xmlns:entry').each do |entry|
|
||||
next unless [:note, :comment, :activity].include? object_type(entry)
|
||||
|
||||
status = Status.find_by(uri: activity_id(entry))
|
||||
|
||||
# If we already have a post and the verb is now "delete", we gotta delete it and move on!
|
||||
if !status.nil? && verb(entry) == :delete
|
||||
delete_post!(status)
|
||||
next
|
||||
end
|
||||
|
||||
next unless status.nil?
|
||||
|
||||
status = Status.new(uri: activity_id(entry), url: activity_link(entry), account: account, text: content(entry), created_at: published(entry), updated_at: updated(entry))
|
||||
|
||||
if verb(entry) == :share
|
||||
add_reblog!(entry, status)
|
||||
elsif verb(entry) == :post
|
||||
if thread_id(entry).nil?
|
||||
add_post!(entry, status)
|
||||
else
|
||||
add_reply!(entry, status)
|
||||
end
|
||||
end
|
||||
|
||||
# If we added a status, go through accounts it mentions and create respective relations
|
||||
unless status.new_record?
|
||||
entry.xpath('./xmlns:link[@rel="mentioned"]').each do |mention_link|
|
||||
# Here we have to do a reverse lookup of local accounts by their URL!
|
||||
# It's not pretty at all! I really wish all these protocols sticked to
|
||||
# using acct:username@domain only! It would make things so much easier
|
||||
# and tidier
|
||||
|
||||
href = Addressable::URI.parse(mention_link.attribute('href').value)
|
||||
|
||||
if href.host == Rails.configuration.x.local_domain
|
||||
mentioned_account = Account.find_local(href.path.gsub('/users/', ''))
|
||||
|
||||
unless mentioned_account.nil?
|
||||
mentioned_account.mentions.where(status: status).first_or_create(status: status)
|
||||
NotificationMailer.mention(mentioned_account, status).deliver_later
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
fan_out_on_write_service.(status)
|
||||
end
|
||||
end
|
||||
update_remote_profile_service.(xml.at_xpath('/xmlns:feed/xmlns:author'), account) unless xml.at_xpath('/xmlns:feed').nil?
|
||||
xml.xpath('//xmlns:entry').each { |entry| process_entry(account, entry) }
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_entry(account, entry)
|
||||
return unless [:note, :comment, :activity].include? object_type(entry)
|
||||
|
||||
status = Status.find_by(uri: activity_id(entry))
|
||||
|
||||
# If we already have a post and the verb is now "delete", we gotta delete it and move on!
|
||||
if !status.nil? && verb(entry) == :delete
|
||||
delete_post!(status)
|
||||
return
|
||||
end
|
||||
|
||||
return unless status.nil?
|
||||
|
||||
status = Status.new(uri: activity_id(entry), url: activity_link(entry), account: account, text: content(entry), created_at: published(entry), updated_at: updated(entry))
|
||||
|
||||
if verb(entry) == :share
|
||||
add_reblog!(entry, status)
|
||||
elsif verb(entry) == :post
|
||||
if thread_id(entry).nil?
|
||||
add_post!(entry, status)
|
||||
else
|
||||
add_reply!(entry, status)
|
||||
end
|
||||
end
|
||||
|
||||
# If we added a status, go through accounts it mentions and create respective relations
|
||||
unless status.new_record?
|
||||
record_remote_mentions(status, entry.xpath('./xmlns:link[@rel="mentioned"]'))
|
||||
fan_out_on_write_service.(status)
|
||||
end
|
||||
end
|
||||
|
||||
def record_remote_mentions(status, links)
|
||||
# Here we have to do a reverse lookup of local accounts by their URL!
|
||||
# It's not pretty at all! I really wish all these protocols sticked to
|
||||
# using acct:username@domain only! It would make things so much easier
|
||||
# and tidier
|
||||
|
||||
links.each do |mention_link|
|
||||
href = Addressable::URI.parse(mention_link.attribute('href').value)
|
||||
|
||||
if href.host == Rails.configuration.x.local_domain
|
||||
# A local user is mentioned
|
||||
mentioned_account = Account.find_local(href.path.gsub('/users/', ''))
|
||||
|
||||
unless mentioned_account.nil?
|
||||
mentioned_account.mentions.where(status: status).first_or_create(status: status)
|
||||
NotificationMailer.mention(mentioned_account, status).deliver_later
|
||||
end
|
||||
else
|
||||
# What to do about remote user?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_post!(_entry, status)
|
||||
status.save!
|
||||
end
|
||||
|
@ -18,7 +18,7 @@ class ProcessMentionsService < BaseService
|
||||
mentioned_account.mentions.where(status: status).first_or_create(status: status)
|
||||
end
|
||||
|
||||
status.mentioned_accounts.each do |mention|
|
||||
status.mentions.each do |mention|
|
||||
mentioned_account = mention.account
|
||||
|
||||
if mentioned_account.local?
|
||||
|
Reference in New Issue
Block a user