update timeline with new posts as they come in

This commit is contained in:
tsmethurst 2021-06-03 18:11:25 +02:00
parent 5acca2a735
commit ab7db633d0
8 changed files with 200 additions and 16 deletions

View File

@ -143,6 +143,8 @@ type DB interface {
// GetFollowersByAccountID is a shortcut for the common action of fetching a list of accounts that accountID is followed by. // GetFollowersByAccountID is a shortcut for the common action of fetching a list of accounts that accountID is followed by.
// The given slice 'followers' will be set to the result of the query, whatever it is. // The given slice 'followers' will be set to the result of the query, whatever it is.
// In case of no entries, a 'no entries' error will be returned // In case of no entries, a 'no entries' error will be returned
//
// If localOnly is set to true, then only followers from *this instance* will be returned.
GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error
// GetFavesByAccountID is a shortcut for the common action of fetching a list of faves made by the given accountID. // GetFavesByAccountID is a shortcut for the common action of fetching a list of faves made by the given accountID.

View File

@ -461,11 +461,18 @@ func (ps *postgresService) GetFollowingByAccountID(accountID string, following *
func (ps *postgresService) GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error { func (ps *postgresService) GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error {
q := ps.conn.Model(followers).Where("target_account_id = ?", accountID) q := ps.conn.Model(followers)
if localOnly {
q = q.ColumnExpr("follow.*").
Join("JOIN accounts AS a ON follow.account_id = TEXT(a.id)").
Where("follow.target_account_id = ?", accountID).
Where("? IS NULL", pg.Ident("a.domain"))
} else {
q = q.Where("target_account_id = ?", accountID)
}
if err := q.Select(); err != nil {
if err := ps.conn.Model(followers).Where("target_account_id = ?", accountID).Select(); err != nil {
if err == pg.ErrNoRows { if err == pg.ErrNoRows {
return nil return nil
} }
@ -802,7 +809,6 @@ func (ps *postgresService) StatusVisible(targetStatus *gtsmodel.Status, targetAc
// If requesting account is nil, that means whoever requested the status didn't auth, or their auth failed. // If requesting account is nil, that means whoever requested the status didn't auth, or their auth failed.
// In this case, we can still serve the status if it's public, otherwise we definitely shouldn't. // In this case, we can still serve the status if it's public, otherwise we definitely shouldn't.
if requestingAccount == nil { if requestingAccount == nil {
if targetStatus.Visibility == gtsmodel.VisibilityPublic { if targetStatus.Visibility == gtsmodel.VisibilityPublic {
return true, nil return true, nil
} }
@ -862,6 +868,18 @@ func (ps *postgresService) StatusVisible(targetStatus *gtsmodel.Status, targetAc
l.Debug("a block exists between requesting account and reply to account") l.Debug("a block exists between requesting account and reply to account")
return false, nil return false, nil
} }
// check reply to ID
if targetStatus.InReplyToID != "" {
followsRepliedAccount, err := ps.Follows(requestingAccount, relevantAccounts.ReplyToAccount)
if err != nil {
return false, err
}
if !followsRepliedAccount {
l.Debug("target status is a followers-only reply to an account that is not followed by the requesting account")
return false, nil
}
}
} }
// status boosts accounts id // status boosts accounts id

View File

@ -278,7 +278,7 @@ func (p *processor) AccountFollowersGet(authed *oauth.Auth, targetAccountID stri
followers := []gtsmodel.Follow{} followers := []gtsmodel.Follow{}
accounts := []apimodel.Account{} accounts := []apimodel.Account{}
if err := p.db.GetFollowersByAccountID(targetAccountID, &followers); err != nil { if err := p.db.GetFollowersByAccountID(targetAccountID, &followers, false); err != nil {
if _, ok := err.(db.ErrNoEntries); ok { if _, ok := err.(db.ErrNoEntries); ok {
return accounts, nil return accounts, nil
} }

View File

@ -40,6 +40,10 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error
return errors.New("note was not parseable as *gtsmodel.Status") return errors.New("note was not parseable as *gtsmodel.Status")
} }
if err := p.timelineStatus(status); err != nil {
return err
}
if err := p.notifyStatus(status); err != nil { if err := p.notifyStatus(status); err != nil {
return err return err
} }
@ -47,7 +51,6 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error
if status.VisibilityAdvanced != nil && status.VisibilityAdvanced.Federated { if status.VisibilityAdvanced != nil && status.VisibilityAdvanced.Federated {
return p.federateStatus(status) return p.federateStatus(status)
} }
return nil
case gtsmodel.ActivityStreamsFollow: case gtsmodel.ActivityStreamsFollow:
// CREATE FOLLOW REQUEST // CREATE FOLLOW REQUEST
followRequest, ok := clientMsg.GTSModel.(*gtsmodel.FollowRequest) followRequest, ok := clientMsg.GTSModel.(*gtsmodel.FollowRequest)

View File

@ -20,6 +20,8 @@ package processing
import ( import (
"fmt" "fmt"
"strings"
"sync"
"github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
@ -213,11 +215,88 @@ func (p *processor) notifyAnnounce(status *gtsmodel.Status) error {
} }
func (p *processor) timelineStatus(status *gtsmodel.Status) error { func (p *processor) timelineStatus(status *gtsmodel.Status) error {
followers := &[]gtsmodel.Follow{} // make sure the author account is pinned onto the status
if err := p.db.GetFollowersByAccountID(status.AccountID, followers); err != nil { if status.GTSAuthorAccount == nil {
a := &gtsmodel.Account{}
if err := p.db.GetByID(status.AccountID, a); err != nil {
return fmt.Errorf("timelineStatus: error getting author account with id %s: %s", status.AccountID, err)
}
status.GTSAuthorAccount = a
} }
// filter out so we only have the local ones // get all relevant accounts here once
localFollowers := &[]gtsmodel.Account{} relevantAccounts, err := p.db.PullRelevantAccountsFromStatus(status)
} if err != nil {
return fmt.Errorf("timelineStatus: error getting relevant accounts from status: %s", err)
}
// get local followers of the account that posted the status
followers := []gtsmodel.Follow{}
if err := p.db.GetFollowersByAccountID(status.AccountID, &followers, true); err != nil {
return fmt.Errorf("timelineStatus: error getting followers for account id %s: %s", status.AccountID, err)
}
// if the poster is local, add a fake entry for them to the followers list so they can see their own status in their timeline
if status.GTSAuthorAccount.Domain == "" {
followers = append(followers, gtsmodel.Follow{
AccountID: status.AccountID,
})
}
wg := sync.WaitGroup{}
wg.Add(len(followers))
errors := make(chan error, len(followers))
for _, f := range followers {
go p.timelineStatusForAccount(status, f.AccountID, relevantAccounts, errors, &wg)
}
// read any errors that come in from the async functions
errs := []string{}
go func() {
for range errors {
e := <-errors
if e != nil {
errs = append(errs, e.Error())
}
}
}()
// wait til all functions have returned and then close the error channel
wg.Wait()
close(errors)
if len(errs) != 0 {
// we have some errors
return fmt.Errorf("timelineStatus: one or more errors timelining statuses: %s", strings.Join(errs, ";"))
}
// no errors, nice
return nil
}
func (p *processor) timelineStatusForAccount(status *gtsmodel.Status, accountID string, relevantAccounts *gtsmodel.RelevantAccounts, errors chan error, wg *sync.WaitGroup) {
defer wg.Done()
// get the targetAccount
timelineAccount := &gtsmodel.Account{}
if err := p.db.GetByID(accountID, timelineAccount); err != nil {
errors <- fmt.Errorf("timelineStatus: error getting account for timeline with id %s: %s", accountID, err)
return
}
// make sure the status is visible
visible, err := p.db.StatusVisible(status, status.GTSAuthorAccount, timelineAccount, relevantAccounts)
if err != nil {
errors <- fmt.Errorf("timelineStatus: error getting visibility for status for timeline with id %s: %s", accountID, err)
return
}
if !visible {
return
}
if err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID); err != nil {
errors <- fmt.Errorf("initTimelineFor: error ingesting status %s: %s", status.ID, err)
}
}

View File

@ -137,6 +137,8 @@ type Processor interface {
// PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. // PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters.
PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode)
/* /*
FEDERATION API-FACING PROCESSING FUNCTIONS FEDERATION API-FACING PROCESSING FUNCTIONS
These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply

View File

@ -51,6 +51,10 @@ type Manager interface {
// //
// It should already be established before calling this function that the status/post actually belongs in the timeline! // It should already be established before calling this function that the status/post actually belongs in the timeline!
Ingest(status *gtsmodel.Status, timelineAccountID string) error Ingest(status *gtsmodel.Status, timelineAccountID string) error
// IngestAndPrepare takes one status and indexes it into the timeline for the given account ID, and then immediately prepares it for serving.
//
// It should already be established before calling this function that the status/post actually belongs in the timeline!
IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) error
// HomeTimeline returns limit n amount of entries from the home timeline of the given account ID, in descending chronological order. // HomeTimeline returns limit n amount of entries from the home timeline of the given account ID, in descending chronological order.
// If maxID is provided, it will return entries from that maxID onwards, inclusive. // If maxID is provided, it will return entries from that maxID onwards, inclusive.
HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error)
@ -96,6 +100,19 @@ func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) erro
return t.IndexOne(status.CreatedAt, status.ID) return t.IndexOne(status.CreatedAt, status.ID)
} }
func (m *manager) IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) error {
l := m.log.WithFields(logrus.Fields{
"func": "IngestAndPrepare",
"timelineAccountID": timelineAccountID,
"statusID": status.ID,
})
t := m.getOrCreateTimeline(timelineAccountID)
l.Trace("ingesting status")
return t.IndexAndPrepareOne(status.CreatedAt, status.ID)
}
func (m *manager) Remove(statusID string, timelineAccountID string) error { func (m *manager) Remove(statusID string, timelineAccountID string) error {
l := m.log.WithFields(logrus.Fields{ l := m.log.WithFields(logrus.Fields{
"func": "Remove", "func": "Remove",
@ -120,7 +137,9 @@ func (m *manager) HomeTimeline(timelineAccountID string, maxID string, sinceID s
var err error var err error
var statuses []*apimodel.Status var statuses []*apimodel.Status
if maxID != "" { if maxID != "" {
statuses, err = t.GetXFromID(limit, maxID) statuses, err = t.GetXFromIDOnwards(limit, maxID)
} else if sinceID != "" {
statuses, err = t.GetXBeforeID(limit, sinceID)
} else { } else {
statuses, err = t.GetXFromTop(limit) statuses, err = t.GetXFromTop(limit)
} }

View File

@ -38,12 +38,19 @@ const (
type Timeline interface { type Timeline interface {
// GetXFromTop returns x amount of posts from the top of the timeline, from newest to oldest. // GetXFromTop returns x amount of posts from the top of the timeline, from newest to oldest.
GetXFromTop(amount int) ([]*apimodel.Status, error) GetXFromTop(amount int) ([]*apimodel.Status, error)
// GetXFromID returns x amount of posts from the given id onwards, from newest to oldest. // GetXFromIDOnwards returns x amount of posts from the given id onwards, from newest to oldest.
// This will include the status with the given ID. // This will include the status with the given ID.
GetXFromID(amount int, fromID string) ([]*apimodel.Status, error) GetXFromIDOnwards(amount int, fromID string) ([]*apimodel.Status, error)
// GetXBeforeID returns x amount of posts up to the given id, from newest to oldest.
// This will NOT include the status with the given ID.
GetXBeforeID(amount int, sinceID string) ([]*apimodel.Status, error)
// IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property. // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property.
IndexOne(statusCreatedAt time.Time, statusID string) error IndexOne(statusCreatedAt time.Time, statusID string) error
// IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property,
// and then immediately prepares it.
IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) error
// Remove removes a status from the timeline. // Remove removes a status from the timeline.
Remove(statusID string) error Remove(statusID string) error
// OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong. // OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong.
@ -177,7 +184,7 @@ func (t *timeline) GetXFromTop(amount int) ([]*apimodel.Status, error) {
return statuses, nil return statuses, nil
} }
func (t *timeline) GetXFromID(amount int, fromID string) ([]*apimodel.Status, error) { func (t *timeline) GetXFromIDOnwards(amount int, fromID string) ([]*apimodel.Status, error) {
// make a slice of statuses with the length we need to return // make a slice of statuses with the length we need to return
statuses := make([]*apimodel.Status, 0, amount) statuses := make([]*apimodel.Status, 0, amount)
@ -235,6 +242,40 @@ func (t *timeline) GetXFromID(amount int, fromID string) ([]*apimodel.Status, er
return statuses, nil return statuses, nil
} }
func (t *timeline) GetXBeforeID(amount int, beforeID string) ([]*apimodel.Status, error) {
// make a slice of statuses with the length we need to return
statuses := make([]*apimodel.Status, 0, amount)
// if there are no prepared posts, just return the empty slice
if t.preparedPosts.data == nil {
t.preparedPosts.data = &list.List{}
}
// iterate through the modified list until we hit the fromID again
var served int
servloop:
for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*preparedPostsEntry)
if !ok {
return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry")
}
if entry.statusID == beforeID {
// we're good
break servloop
}
// serve up to the amount requested
statuses = append(statuses, entry.prepared)
served = served + 1
if served >= amount {
break
}
}
return statuses, nil
}
func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error { func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -247,6 +288,26 @@ func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error {
return t.postIndex.insertIndexed(postIndexEntry) return t.postIndex.insertIndexed(postIndexEntry)
} }
func (t *timeline) IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) error {
t.Lock()
defer t.Unlock()
postIndexEntry := &postIndexEntry{
createdAt: statusCreatedAt,
statusID: statusID,
}
if err := t.postIndex.insertIndexed(postIndexEntry); err != nil {
return fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err)
}
if err := t.prepare(statusID); err != nil {
return fmt.Errorf("IndexAndPrepareOne: error preparing: %s", err)
}
return nil
}
func (t *timeline) Remove(statusID string) error { func (t *timeline) Remove(statusID string) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()