diff --git a/internal/db/db.go b/internal/db/db.go index 6f2edb9..af0eb89 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -143,6 +143,8 @@ type DB interface { // GetFollowersByAccountID is a shortcut for the common action of fetching a list of accounts that accountID is followed by. // The given slice 'followers' will be set to the result of the query, whatever it is. // In case of no entries, a 'no entries' error will be returned + // + // If localOnly is set to true, then only followers from *this instance* will be returned. GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error // GetFavesByAccountID is a shortcut for the common action of fetching a list of faves made by the given accountID. diff --git a/internal/db/pg/pg.go b/internal/db/pg/pg.go index 1d27572..5c6720a 100644 --- a/internal/db/pg/pg.go +++ b/internal/db/pg/pg.go @@ -461,11 +461,18 @@ func (ps *postgresService) GetFollowingByAccountID(accountID string, following * func (ps *postgresService) GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error { - q := ps.conn.Model(followers).Where("target_account_id = ?", accountID) + q := ps.conn.Model(followers) + if localOnly { + q = q.ColumnExpr("follow.*"). + Join("JOIN accounts AS a ON follow.account_id = TEXT(a.id)"). + Where("follow.target_account_id = ?", accountID). + Where("? IS NULL", pg.Ident("a.domain")) + } else { + q = q.Where("target_account_id = ?", accountID) + } - - if err := ps.conn.Model(followers).Where("target_account_id = ?", accountID).Select(); err != nil { + if err := q.Select(); err != nil { if err == pg.ErrNoRows { return nil } @@ -802,7 +809,6 @@ func (ps *postgresService) StatusVisible(targetStatus *gtsmodel.Status, targetAc // If requesting account is nil, that means whoever requested the status didn't auth, or their auth failed. // In this case, we can still serve the status if it's public, otherwise we definitely shouldn't. if requestingAccount == nil { - if targetStatus.Visibility == gtsmodel.VisibilityPublic { return true, nil } @@ -862,6 +868,18 @@ func (ps *postgresService) StatusVisible(targetStatus *gtsmodel.Status, targetAc l.Debug("a block exists between requesting account and reply to account") return false, nil } + + // check reply to ID + if targetStatus.InReplyToID != "" { + followsRepliedAccount, err := ps.Follows(requestingAccount, relevantAccounts.ReplyToAccount) + if err != nil { + return false, err + } + if !followsRepliedAccount { + l.Debug("target status is a followers-only reply to an account that is not followed by the requesting account") + return false, nil + } + } } // status boosts accounts id diff --git a/internal/processing/account.go b/internal/processing/account.go index 92ccf10..71faa74 100644 --- a/internal/processing/account.go +++ b/internal/processing/account.go @@ -278,7 +278,7 @@ func (p *processor) AccountFollowersGet(authed *oauth.Auth, targetAccountID stri followers := []gtsmodel.Follow{} accounts := []apimodel.Account{} - if err := p.db.GetFollowersByAccountID(targetAccountID, &followers); err != nil { + if err := p.db.GetFollowersByAccountID(targetAccountID, &followers, false); err != nil { if _, ok := err.(db.ErrNoEntries); ok { return accounts, nil } diff --git a/internal/processing/fromclientapi.go b/internal/processing/fromclientapi.go index 0d8b73e..e7fe28c 100644 --- a/internal/processing/fromclientapi.go +++ b/internal/processing/fromclientapi.go @@ -40,6 +40,10 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error return errors.New("note was not parseable as *gtsmodel.Status") } + if err := p.timelineStatus(status); err != nil { + return err + } + if err := p.notifyStatus(status); err != nil { return err } @@ -47,7 +51,6 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error if status.VisibilityAdvanced != nil && status.VisibilityAdvanced.Federated { return p.federateStatus(status) } - return nil case gtsmodel.ActivityStreamsFollow: // CREATE FOLLOW REQUEST followRequest, ok := clientMsg.GTSModel.(*gtsmodel.FollowRequest) diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index 71a0bba..5abb767 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -20,6 +20,8 @@ package processing import ( "fmt" + "strings" + "sync" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" @@ -213,11 +215,88 @@ func (p *processor) notifyAnnounce(status *gtsmodel.Status) error { } func (p *processor) timelineStatus(status *gtsmodel.Status) error { - followers := &[]gtsmodel.Follow{} - if err := p.db.GetFollowersByAccountID(status.AccountID, followers); err != nil { - + // make sure the author account is pinned onto the status + if status.GTSAuthorAccount == nil { + a := >smodel.Account{} + if err := p.db.GetByID(status.AccountID, a); err != nil { + return fmt.Errorf("timelineStatus: error getting author account with id %s: %s", status.AccountID, err) + } + status.GTSAuthorAccount = a } - // filter out so we only have the local ones - localFollowers := &[]gtsmodel.Account{} -} \ No newline at end of file + // get all relevant accounts here once + relevantAccounts, err := p.db.PullRelevantAccountsFromStatus(status) + if err != nil { + return fmt.Errorf("timelineStatus: error getting relevant accounts from status: %s", err) + } + + // get local followers of the account that posted the status + followers := []gtsmodel.Follow{} + if err := p.db.GetFollowersByAccountID(status.AccountID, &followers, true); err != nil { + return fmt.Errorf("timelineStatus: error getting followers for account id %s: %s", status.AccountID, err) + } + + // if the poster is local, add a fake entry for them to the followers list so they can see their own status in their timeline + if status.GTSAuthorAccount.Domain == "" { + followers = append(followers, gtsmodel.Follow{ + AccountID: status.AccountID, + }) + } + + wg := sync.WaitGroup{} + wg.Add(len(followers)) + errors := make(chan error, len(followers)) + + for _, f := range followers { + go p.timelineStatusForAccount(status, f.AccountID, relevantAccounts, errors, &wg) + } + + // read any errors that come in from the async functions + errs := []string{} + go func() { + for range errors { + e := <-errors + if e != nil { + errs = append(errs, e.Error()) + } + } + }() + + // wait til all functions have returned and then close the error channel + wg.Wait() + close(errors) + + if len(errs) != 0 { + // we have some errors + return fmt.Errorf("timelineStatus: one or more errors timelining statuses: %s", strings.Join(errs, ";")) + } + + // no errors, nice + return nil +} + +func (p *processor) timelineStatusForAccount(status *gtsmodel.Status, accountID string, relevantAccounts *gtsmodel.RelevantAccounts, errors chan error, wg *sync.WaitGroup) { + defer wg.Done() + + // get the targetAccount + timelineAccount := >smodel.Account{} + if err := p.db.GetByID(accountID, timelineAccount); err != nil { + errors <- fmt.Errorf("timelineStatus: error getting account for timeline with id %s: %s", accountID, err) + return + } + + // make sure the status is visible + visible, err := p.db.StatusVisible(status, status.GTSAuthorAccount, timelineAccount, relevantAccounts) + if err != nil { + errors <- fmt.Errorf("timelineStatus: error getting visibility for status for timeline with id %s: %s", accountID, err) + return + } + + if !visible { + return + } + + if err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID); err != nil { + errors <- fmt.Errorf("initTimelineFor: error ingesting status %s: %s", status.ID, err) + } +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index e4d860f..e1cd152 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -137,6 +137,8 @@ type Processor interface { // PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) + + /* FEDERATION API-FACING PROCESSING FUNCTIONS These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply diff --git a/internal/processing/timeline/manager.go b/internal/processing/timeline/manager.go index 5aff388..45dd08b 100644 --- a/internal/processing/timeline/manager.go +++ b/internal/processing/timeline/manager.go @@ -51,6 +51,10 @@ type Manager interface { // // It should already be established before calling this function that the status/post actually belongs in the timeline! Ingest(status *gtsmodel.Status, timelineAccountID string) error + // IngestAndPrepare takes one status and indexes it into the timeline for the given account ID, and then immediately prepares it for serving. + // + // It should already be established before calling this function that the status/post actually belongs in the timeline! + IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) error // HomeTimeline returns limit n amount of entries from the home timeline of the given account ID, in descending chronological order. // If maxID is provided, it will return entries from that maxID onwards, inclusive. HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) @@ -96,6 +100,19 @@ func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) erro return t.IndexOne(status.CreatedAt, status.ID) } +func (m *manager) IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) error { + l := m.log.WithFields(logrus.Fields{ + "func": "IngestAndPrepare", + "timelineAccountID": timelineAccountID, + "statusID": status.ID, + }) + + t := m.getOrCreateTimeline(timelineAccountID) + + l.Trace("ingesting status") + return t.IndexAndPrepareOne(status.CreatedAt, status.ID) +} + func (m *manager) Remove(statusID string, timelineAccountID string) error { l := m.log.WithFields(logrus.Fields{ "func": "Remove", @@ -120,7 +137,9 @@ func (m *manager) HomeTimeline(timelineAccountID string, maxID string, sinceID s var err error var statuses []*apimodel.Status if maxID != "" { - statuses, err = t.GetXFromID(limit, maxID) + statuses, err = t.GetXFromIDOnwards(limit, maxID) + } else if sinceID != "" { + statuses, err = t.GetXBeforeID(limit, sinceID) } else { statuses, err = t.GetXFromTop(limit) } diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 8844053..9352e79 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -38,12 +38,19 @@ const ( type Timeline interface { // GetXFromTop returns x amount of posts from the top of the timeline, from newest to oldest. GetXFromTop(amount int) ([]*apimodel.Status, error) - // GetXFromID returns x amount of posts from the given id onwards, from newest to oldest. + // GetXFromIDOnwards returns x amount of posts from the given id onwards, from newest to oldest. // This will include the status with the given ID. - GetXFromID(amount int, fromID string) ([]*apimodel.Status, error) + GetXFromIDOnwards(amount int, fromID string) ([]*apimodel.Status, error) + + // GetXBeforeID returns x amount of posts up to the given id, from newest to oldest. + // This will NOT include the status with the given ID. + GetXBeforeID(amount int, sinceID string) ([]*apimodel.Status, error) // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property. IndexOne(statusCreatedAt time.Time, statusID string) error + // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property, + // and then immediately prepares it. + IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) error // Remove removes a status from the timeline. Remove(statusID string) error // OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong. @@ -177,7 +184,7 @@ func (t *timeline) GetXFromTop(amount int) ([]*apimodel.Status, error) { return statuses, nil } -func (t *timeline) GetXFromID(amount int, fromID string) ([]*apimodel.Status, error) { +func (t *timeline) GetXFromIDOnwards(amount int, fromID string) ([]*apimodel.Status, error) { // make a slice of statuses with the length we need to return statuses := make([]*apimodel.Status, 0, amount) @@ -235,6 +242,40 @@ func (t *timeline) GetXFromID(amount int, fromID string) ([]*apimodel.Status, er return statuses, nil } +func (t *timeline) GetXBeforeID(amount int, beforeID string) ([]*apimodel.Status, error) { + // make a slice of statuses with the length we need to return + statuses := make([]*apimodel.Status, 0, amount) + + // if there are no prepared posts, just return the empty slice + if t.preparedPosts.data == nil { + t.preparedPosts.data = &list.List{} + } + + // iterate through the modified list until we hit the fromID again + var served int +servloop: + for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*preparedPostsEntry) + if !ok { + return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry") + } + + if entry.statusID == beforeID { + // we're good + break servloop + } + + // serve up to the amount requested + statuses = append(statuses, entry.prepared) + served = served + 1 + if served >= amount { + break + } + } + + return statuses, nil +} + func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error { t.Lock() defer t.Unlock() @@ -247,6 +288,26 @@ func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error { return t.postIndex.insertIndexed(postIndexEntry) } +func (t *timeline) IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) error { + t.Lock() + defer t.Unlock() + + postIndexEntry := &postIndexEntry{ + createdAt: statusCreatedAt, + statusID: statusID, + } + + if err := t.postIndex.insertIndexed(postIndexEntry); err != nil { + return fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err) + } + + if err := t.prepare(statusID); err != nil { + return fmt.Errorf("IndexAndPrepareOne: error preparing: %s", err) + } + + return nil +} + func (t *timeline) Remove(statusID string) error { t.Lock() defer t.Unlock()