From 5acca2a73514ea0758602c855ebd2ea73fd7ffe6 Mon Sep 17 00:00:00 2001 From: tsmethurst Date: Thu, 3 Jun 2021 10:57:24 +0200 Subject: [PATCH] bit more progress --- internal/db/db.go | 2 +- internal/db/pg/pg.go | 7 +- internal/federation/federatingdb/create.go | 11 ++- internal/federation/federatingdb/followers.go | 2 +- internal/gtsmodel/messages.go | 8 +- internal/processing/fromcommon.go | 10 +++ internal/processing/fromfederator.go | 5 ++ internal/processing/timeline/manager.go | 84 +++++++++---------- internal/processing/timeline/postindex.go | 2 +- internal/processing/timeline/timeline.go | 64 ++++++-------- internal/typeutils/internaltofrontend.go | 2 +- 11 files changed, 99 insertions(+), 98 deletions(-) diff --git a/internal/db/db.go b/internal/db/db.go index 6eec1a8..6f2edb9 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -143,7 +143,7 @@ type DB interface { // GetFollowersByAccountID is a shortcut for the common action of fetching a list of accounts that accountID is followed by. // The given slice 'followers' will be set to the result of the query, whatever it is. // In case of no entries, a 'no entries' error will be returned - GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow) error + GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error // GetFavesByAccountID is a shortcut for the common action of fetching a list of faves made by the given accountID. // The given slice 'faves' will be set to the result of the query, whatever it is. diff --git a/internal/db/pg/pg.go b/internal/db/pg/pg.go index c483e20..1d27572 100644 --- a/internal/db/pg/pg.go +++ b/internal/db/pg/pg.go @@ -459,7 +459,12 @@ func (ps *postgresService) GetFollowingByAccountID(accountID string, following * return nil } -func (ps *postgresService) GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow) error { +func (ps *postgresService) GetFollowersByAccountID(accountID string, followers *[]gtsmodel.Follow, localOnly bool) error { + + q := ps.conn.Model(followers).Where("target_account_id = ?", accountID) + + + if err := ps.conn.Model(followers).Where("target_account_id = ?", accountID).Select(); err != nil { if err == pg.ErrNoRows { return nil diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 026674e..d2a060e 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -101,16 +101,19 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { } if err := f.db.Put(status); err != nil { if _, ok := err.(db.ErrAlreadyExists); ok { + // the status already exists in the database, which means we've already handled everything else, + // so we can just return nil here and be done with it. return nil } + // an actual error has happened return fmt.Errorf("database error inserting status: %s", err) } fromFederatorChan <- gtsmodel.FromFederator{ - APObjectType: gtsmodel.ActivityStreamsNote, - APActivityType: gtsmodel.ActivityStreamsCreate, - GTSModel: status, - ReceivingAccount: targetAcct, + APObjectType: gtsmodel.ActivityStreamsNote, + APActivityType: gtsmodel.ActivityStreamsCreate, + GTSModel: status, + ReceivingAccount: targetAcct, } } } diff --git a/internal/federation/federatingdb/followers.go b/internal/federation/federatingdb/followers.go index 7cba101..28e6e9d 100644 --- a/internal/federation/federatingdb/followers.go +++ b/internal/federation/federatingdb/followers.go @@ -43,7 +43,7 @@ func (f *federatingDB) Followers(c context.Context, actorIRI *url.URL) (follower } acctFollowers := []gtsmodel.Follow{} - if err := f.db.GetFollowersByAccountID(acct.ID, &acctFollowers); err != nil { + if err := f.db.GetFollowersByAccountID(acct.ID, &acctFollowers, false); err != nil { return nil, fmt.Errorf("db error getting followers for account id %s: %s", acct.ID, err) } diff --git a/internal/gtsmodel/messages.go b/internal/gtsmodel/messages.go index 910c748..d573441 100644 --- a/internal/gtsmodel/messages.go +++ b/internal/gtsmodel/messages.go @@ -25,8 +25,8 @@ type FromClientAPI struct { // FromFederator wraps a message that travels from the federator into the processor type FromFederator struct { - APObjectType string - APActivityType string - GTSModel interface{} - ReceivingAccount *Account + APObjectType string + APActivityType string + GTSModel interface{} + ReceivingAccount *Account } diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index bdb2a59..71a0bba 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -211,3 +211,13 @@ func (p *processor) notifyAnnounce(status *gtsmodel.Status) error { return nil } + +func (p *processor) timelineStatus(status *gtsmodel.Status) error { + followers := &[]gtsmodel.Follow{} + if err := p.db.GetFollowersByAccountID(status.AccountID, followers); err != nil { + + } + + // filter out so we only have the local ones + localFollowers := &[]gtsmodel.Account{} +} \ No newline at end of file diff --git a/internal/processing/fromfederator.go b/internal/processing/fromfederator.go index 479bdec..480e8fd 100644 --- a/internal/processing/fromfederator.go +++ b/internal/processing/fromfederator.go @@ -56,9 +56,14 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er return fmt.Errorf("error updating dereferenced status in the db: %s", err) } + if err := p.timelineStatus(incomingStatus); err != nil { + return err + } + if err := p.notifyStatus(incomingStatus); err != nil { return err } + case gtsmodel.ActivityStreamsProfile: // CREATE AN ACCOUNT incomingAccount, ok := federatorMsg.GTSModel.(*gtsmodel.Account) diff --git a/internal/processing/timeline/manager.go b/internal/processing/timeline/manager.go index 6758047..5aff388 100644 --- a/internal/processing/timeline/manager.go +++ b/internal/processing/timeline/manager.go @@ -34,15 +34,37 @@ const ( desiredPostIndexLength = 400 ) +// Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines. +// +// By the time a status hits the manager interface, it should already have been filtered and it should be established that the status indeed +// belongs in the home timeline of the given account ID. +// +// The manager makes a distinction between *indexed* posts and *prepared* posts. +// +// Indexed posts consist of just that post's ID (in the database) and the time it was created. An indexed post takes up very little memory, so +// it's not a huge priority to keep trimming the indexed posts list. +// +// Prepared posts consist of the post's database ID, the time it was created, AND the apimodel representation of that post, for quick serialization. +// Prepared posts of course take up more memory than indexed posts, so they should be regularly pruned if they're not being actively served. type Manager interface { + // Ingest takes one status and indexes it into the timeline for the given account ID. + // + // It should already be established before calling this function that the status/post actually belongs in the timeline! Ingest(status *gtsmodel.Status, timelineAccountID string) error + // HomeTimeline returns limit n amount of entries from the home timeline of the given account ID, in descending chronological order. + // If maxID is provided, it will return entries from that maxID onwards, inclusive. HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) + // GetIndexedLength returns the amount of posts/statuses that have been *indexed* for the given account ID. GetIndexedLength(timelineAccountID string) int + // GetDesiredIndexLength returns the amount of posts that we, ideally, index for each user. GetDesiredIndexLength() int + // GetOldestIndexedID returns the status ID for the oldest post that we have indexed for the given account. GetOldestIndexedID(timelineAccountID string) (string, error) + // PrepareXFromTop prepares limit n amount of posts, based on their indexed representations, from the top of the index. PrepareXFromTop(timelineAccountID string, limit int) error } +// NewManager returns a new timeline manager with the given database, typeconverter, config, and log. func NewManager(db db.DB, tc typeutils.TypeConverter, config *config.Config, log *logrus.Logger) Manager { return &manager{ accountTimelines: sync.Map{}, @@ -68,17 +90,9 @@ func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) erro "statusID": status.ID, }) - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - t = NewTimeline(timelineAccountID, m.db, m.tc) - m.accountTimelines.Store(timelineAccountID, t) - } else { - t = i.(Timeline) - } + t := m.getOrCreateTimeline(timelineAccountID) l.Trace("ingesting status") - return t.IndexOne(status.CreatedAt, status.ID) } @@ -89,17 +103,9 @@ func (m *manager) Remove(statusID string, timelineAccountID string) error { "statusID": statusID, }) - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - t = NewTimeline(timelineAccountID, m.db, m.tc) - m.accountTimelines.Store(timelineAccountID, t) - } else { - t = i.(Timeline) - } + t := m.getOrCreateTimeline(timelineAccountID) l.Trace("removing status") - return t.Remove(statusID) } @@ -109,19 +115,12 @@ func (m *manager) HomeTimeline(timelineAccountID string, maxID string, sinceID s "timelineAccountID": timelineAccountID, }) - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - t = NewTimeline(timelineAccountID, m.db, m.tc) - m.accountTimelines.Store(timelineAccountID, t) - } else { - t = i.(Timeline) - } + t := m.getOrCreateTimeline(timelineAccountID) var err error var statuses []*apimodel.Status if maxID != "" { - statuses, err = t.GetXBehindID(limit, maxID) + statuses, err = t.GetXFromID(limit, maxID) } else { statuses, err = t.GetXFromTop(limit) } @@ -133,14 +132,7 @@ func (m *manager) HomeTimeline(timelineAccountID string, maxID string, sinceID s } func (m *manager) GetIndexedLength(timelineAccountID string) int { - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - t = NewTimeline(timelineAccountID, m.db, m.tc) - m.accountTimelines.Store(timelineAccountID, t) - } else { - t = i.(Timeline) - } + t := m.getOrCreateTimeline(timelineAccountID) return t.PostIndexLength() } @@ -150,27 +142,29 @@ func (m *manager) GetDesiredIndexLength() int { } func (m *manager) GetOldestIndexedID(timelineAccountID string) (string, error) { - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - t = NewTimeline(timelineAccountID, m.db, m.tc) - m.accountTimelines.Store(timelineAccountID, t) - } else { - t = i.(Timeline) - } + t := m.getOrCreateTimeline(timelineAccountID) return t.OldestIndexedPostID() } func (m *manager) PrepareXFromTop(timelineAccountID string, limit int) error { + t := m.getOrCreateTimeline(timelineAccountID) + + return t.PrepareXFromTop(limit) +} + +func (m *manager) getOrCreateTimeline(timelineAccountID string) Timeline { var t Timeline i, ok := m.accountTimelines.Load(timelineAccountID) if !ok { t = NewTimeline(timelineAccountID, m.db, m.tc) m.accountTimelines.Store(timelineAccountID, t) } else { - t = i.(Timeline) + t, ok = i.(Timeline) + if !ok { + panic("couldn't parse entry as Timeline, this should never happen so panic") + } } - return t.PrepareXFromTop(limit) + return t } diff --git a/internal/processing/timeline/postindex.go b/internal/processing/timeline/postindex.go index 2766bfc..86afcb5 100644 --- a/internal/processing/timeline/postindex.go +++ b/internal/processing/timeline/postindex.go @@ -15,7 +15,7 @@ type postIndexEntry struct { statusID string } -func (p *postIndex) index(i *postIndexEntry) error { +func (p *postIndex) insertIndexed(i *postIndexEntry) error { if p.data == nil { p.data = &list.List{} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 19f55db..8844053 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -32,20 +32,18 @@ import ( ) const ( - fromLatest = "FROM_LATEST" preparedPostsMaxLength = desiredPostIndexLength ) type Timeline interface { // GetXFromTop returns x amount of posts from the top of the timeline, from newest to oldest. GetXFromTop(amount int) ([]*apimodel.Status, error) - // GetXFromTop returns x amount of posts from the given id onwards, from newest to oldest. - GetXBehindID(amount int, fromID string) ([]*apimodel.Status, error) + // GetXFromID returns x amount of posts from the given id onwards, from newest to oldest. + // This will include the status with the given ID. + GetXFromID(amount int, fromID string) ([]*apimodel.Status, error) // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property. IndexOne(statusCreatedAt time.Time, statusID string) error - // IndexMany instructs the timeline to index all the given posts. - IndexMany([]*apimodel.Status) error // Remove removes a status from the timeline. Remove(statusID string) error // OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong. @@ -111,7 +109,6 @@ func (t *timeline) PrepareXFromIndex(amount int, index int) error { prepared = prepared + 1 if prepared >= amount { // we're done - fmt.Printf("\n\n\nprepared %d entries\n\n\n", prepared) break } } @@ -180,7 +177,7 @@ func (t *timeline) GetXFromTop(amount int) ([]*apimodel.Status, error) { return statuses, nil } -func (t *timeline) GetXBehindID(amount int, fromID string) ([]*apimodel.Status, error) { +func (t *timeline) GetXFromID(amount int, fromID string) ([]*apimodel.Status, error) { // make a slice of statuses with the length we need to return statuses := make([]*apimodel.Status, 0, amount) @@ -197,15 +194,13 @@ func (t *timeline) GetXBehindID(amount int, fromID string) ([]*apimodel.Status, return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") } if entry.statusID == fromID { - fmt.Printf("\n\n\nfromid %s is at position %d\n\n\n", fromID, position) break } position = position + 1 } - // make sure we have enough posts prepared to return + // make sure we have enough posts prepared behind it to return what we're being asked for if t.preparedPosts.data.Len() < amount+position { - if err := t.PrepareXFromIndex(amount, position); err != nil { return nil, err } @@ -221,14 +216,14 @@ func (t *timeline) GetXBehindID(amount int, fromID string) ([]*apimodel.Status, } if !serving { - // we're not serving yet but we might on the next time round if we hit our from id + // start serving if we've hit the id we're looking for if entry.statusID == fromID { - fmt.Printf("\n\n\nwe've hit fromid %s at position %d, will now serve\n\n\n", fromID, position) serving = true - continue } - } else { - // we're serving now! + } + + if serving { + // serve up to the amount requested statuses = append(statuses, entry.prepared) served = served + 1 if served >= amount { @@ -248,7 +243,8 @@ func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error { createdAt: statusCreatedAt, statusID: statusID, } - return t.postIndex.index(postIndexEntry) + + return t.postIndex.insertIndexed(postIndexEntry) } func (t *timeline) Remove(statusID string) error { @@ -263,7 +259,7 @@ func (t *timeline) Remove(statusID string) error { } if entry.statusID == statusID { t.postIndex.data.Remove(e) - break // bail once we found it + break // bail once we found and removed it } } @@ -275,34 +271,13 @@ func (t *timeline) Remove(statusID string) error { } if entry.statusID == statusID { t.preparedPosts.data.Remove(e) - break // bail once we found it + break // bail once we found and removed it } } return nil } -func (t *timeline) IndexMany(statuses []*apimodel.Status) error { - t.Lock() - defer t.Unlock() - - // add statuses to the index - for _, s := range statuses { - createdAt, err := time.Parse(s.CreatedAt, time.RFC3339) - if err != nil { - return fmt.Errorf("IndexMany: could not parse time %s on status id %s: %s", s.CreatedAt, s.ID, err) - } - postIndexEntry := &postIndexEntry{ - createdAt: createdAt, - statusID: s.ID, - } - if err := t.postIndex.index(postIndexEntry); err != nil { - return err - } - } - return nil -} - func (t *timeline) Reset() error { return nil } @@ -318,11 +293,14 @@ func (t *timeline) PostIndexLength() int { func (t *timeline) OldestIndexedPostID() (string, error) { var id string if t.postIndex == nil || t.postIndex.data == nil { + // return an empty string if postindex hasn't been initialized yet return id, nil } + e := t.postIndex.data.Back() if e == nil { + // return an empty string if there's no back entry (ie., the index list hasn't been initialized yet) return id, nil } @@ -330,16 +308,18 @@ func (t *timeline) OldestIndexedPostID() (string, error) { if !ok { return id, errors.New("OldestIndexedPostID: could not parse e as a postIndexEntry") } - return entry.statusID, nil } func (t *timeline) prepare(statusID string) error { + + // start by getting the status out of the database according to its indexed ID gtsStatus := >smodel.Status{} if err := t.db.GetByID(statusID, gtsStatus); err != nil { return err } + // if the account pointer hasn't been set on this timeline already, set it lazily here if t.account == nil { timelineOwnerAccount := >smodel.Account{} if err := t.db.GetByID(t.accountID, timelineOwnerAccount); err != nil { @@ -348,11 +328,13 @@ func (t *timeline) prepare(statusID string) error { t.account = timelineOwnerAccount } + // to convert the status we need relevant accounts from it, so pull them out here relevantAccounts, err := t.db.PullRelevantAccountsFromStatus(gtsStatus) if err != nil { return err } + // check if this is a boost... var reblogOfStatus *gtsmodel.Status if gtsStatus.BoostOfID != "" { s := >smodel.Status{} @@ -362,11 +344,13 @@ func (t *timeline) prepare(statusID string) error { reblogOfStatus = s } + // serialize the status (or, at least, convert it to a form that's ready to be serialized) apiModelStatus, err := t.tc.StatusToMasto(gtsStatus, relevantAccounts.StatusAuthor, t.account, relevantAccounts.BoostedAccount, relevantAccounts.ReplyToAccount, reblogOfStatus) if err != nil { return err } + // shove it in prepared posts as a prepared posts entry preparedPostsEntry := &preparedPostsEntry{ createdAt: gtsStatus.CreatedAt, statusID: statusID, diff --git a/internal/typeutils/internaltofrontend.go b/internal/typeutils/internaltofrontend.go index de3b94e..1c283e9 100644 --- a/internal/typeutils/internaltofrontend.go +++ b/internal/typeutils/internaltofrontend.go @@ -64,7 +64,7 @@ func (c *converter) AccountToMastoSensitive(a *gtsmodel.Account) (*model.Account func (c *converter) AccountToMastoPublic(a *gtsmodel.Account) (*model.Account, error) { // count followers followers := []gtsmodel.Follow{} - if err := c.db.GetFollowersByAccountID(a.ID, &followers); err != nil { + if err := c.db.GetFollowersByAccountID(a.ID, &followers, false); err != nil { if _, ok := err.(db.ErrNoEntries); !ok { return nil, fmt.Errorf("error getting followers: %s", err) }