diff --git a/internal/cliactions/server/server.go b/internal/cliactions/server/server.go index bfb2751..8759895 100644 --- a/internal/cliactions/server/server.go +++ b/internal/cliactions/server/server.go @@ -25,6 +25,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/api/client/search" "github.com/superseriousbusiness/gotosocial/internal/api/client/status" "github.com/superseriousbusiness/gotosocial/internal/api/client/timeline" + timelineprocessing "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" "github.com/superseriousbusiness/gotosocial/internal/api/s2s/user" "github.com/superseriousbusiness/gotosocial/internal/api/s2s/webfinger" "github.com/superseriousbusiness/gotosocial/internal/api/security" @@ -88,13 +89,14 @@ var Start cliactions.GTSAction = func(ctx context.Context, c *config.Config, log // build converters and util typeConverter := typeutils.NewConverter(c, dbService) + timelineManager := timelineprocessing.NewManager(dbService, typeConverter, c, log) // build backend handlers mediaHandler := media.New(c, dbService, storageBackend, log) oauthServer := oauth.New(dbService, log) transportController := transport.NewController(c, &federation.Clock{}, http.DefaultClient, log) federator := federation.NewFederator(dbService, federatingDB, transportController, c, log, typeConverter) - processor := processing.NewProcessor(c, typeConverter, federator, oauthServer, mediaHandler, storageBackend, dbService, log) + processor := processing.NewProcessor(c, typeConverter, federator, oauthServer, mediaHandler, storageBackend, timelineManager, dbService, log) if err := processor.Start(); err != nil { return fmt.Errorf("error starting processor: %s", err) } diff --git a/internal/db/db.go b/internal/db/db.go index 1774420..6eec1a8 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -257,6 +257,8 @@ type DB interface { // This slice will be unfiltered, not taking account of blocks and whatnot, so filter it before serving it back to a user. WhoBoostedStatus(status *gtsmodel.Status) ([]*gtsmodel.Account, error) + GetStatusesWhereFollowing(accountID string, limit int, offsetStatusID string) ([]*gtsmodel.Status, error) + // GetHomeTimelineForAccount fetches the account's HOME timeline -- ie., posts and replies from people they *follow*. // It will use the given filters and try to return as many statuses up to the limit as possible. GetHomeTimelineForAccount(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) diff --git a/internal/db/pg/pg.go b/internal/db/pg/pg.go index c2bb703..c483e20 100644 --- a/internal/db/pg/pg.go +++ b/internal/db/pg/pg.go @@ -223,12 +223,16 @@ func (ps *postgresService) GetWhere(where []db.Where, i interface{}) error { q := ps.conn.Model(i) for _, w := range where { - if w.CaseInsensitive { - q = q.Where("LOWER(?) = LOWER(?)", pg.Safe(w.Key), w.Value) - } else { - q = q.Where("? = ?", pg.Safe(w.Key), w.Value) - } + if w.Value == nil { + q = q.Where("? IS NULL", pg.Ident(w.Key)) + } else { + if w.CaseInsensitive { + q = q.Where("LOWER(?) = LOWER(?)", pg.Safe(w.Key), w.Value) + } else { + q = q.Where("? = ?", pg.Safe(w.Key), w.Value) + } + } } if err := q.Select(); err != nil { @@ -964,6 +968,16 @@ func (ps *postgresService) PullRelevantAccountsFromStatus(targetStatus *gtsmodel MentionedAccounts: []*gtsmodel.Account{}, } + // get the author account + if targetStatus.GTSAuthorAccount == nil { + statusAuthor := >smodel.Account{} + if err := ps.conn.Model(statusAuthor).Where("id = ?", targetStatus.AccountID).Select(); err != nil { + return accounts, fmt.Errorf("PullRelevantAccountsFromStatus: error getting statusAuthor with id %s: %s", targetStatus.AccountID, err) + } + targetStatus.GTSAuthorAccount = statusAuthor + } + accounts.StatusAuthor = targetStatus.GTSAuthorAccount + // get the replied to account from the status and add it to the pile if targetStatus.InReplyToAccountID != "" { repliedToAccount := >smodel.Account{} @@ -1139,6 +1153,38 @@ func (ps *postgresService) WhoBoostedStatus(status *gtsmodel.Status) ([]*gtsmode return accounts, nil } +func (ps *postgresService) GetStatusesWhereFollowing(accountID string, limit int, offsetStatusID string) ([]*gtsmodel.Status, error) { + statuses := []*gtsmodel.Status{} + + q := ps.conn.Model(&statuses) + + q = q.ColumnExpr("status.*"). + Join("JOIN follows AS f ON f.target_account_id = status.account_id"). + Where("f.account_id = ?", accountID). + Order("status.created_at DESC") + + if offsetStatusID != "" { + s := >smodel.Status{} + if err := ps.conn.Model(s).Where("id = ?", offsetStatusID).Select(); err != nil { + return nil, err + } + q = q.Where("status.created_at < ?", s.CreatedAt) + } + + if limit > 0 { + q = q.Limit(limit) + } + + err := q.Select() + if err != nil { + if err != pg.ErrNoRows { + return nil, err + } + } + + return statuses, nil +} + func (ps *postgresService) GetHomeTimelineForAccount(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) { statuses := []*gtsmodel.Status{} diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index 16c00ca..6572b03 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -153,6 +153,7 @@ type VisibilityAdvanced struct { // RelevantAccounts denotes accounts that are replied to, boosted by, or mentioned in a status. type RelevantAccounts struct { + StatusAuthor *Account ReplyToAccount *Account BoostedAccount *Account BoostedReplyToAccount *Account diff --git a/internal/processing/processor.go b/internal/processing/processor.go index b31c37b..e4d860f 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -31,6 +31,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" ) @@ -132,9 +133,9 @@ type Processor interface { StatusGetContext(authed *oauth.Auth, targetStatusID string) (*apimodel.Context, ErrorWithCode) // HomeTimelineGet returns statuses from the home timeline, with the given filters/parameters. - HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) + HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) // PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. - PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) + PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) /* FEDERATION API-FACING PROCESSING FUNCTIONS @@ -182,34 +183,36 @@ type processor struct { // toClientAPI chan gtsmodel.ToClientAPI fromClientAPI chan gtsmodel.FromClientAPI // toFederator chan gtsmodel.ToFederator - fromFederator chan gtsmodel.FromFederator - federator federation.Federator - stop chan interface{} - log *logrus.Logger - config *config.Config - tc typeutils.TypeConverter - oauthServer oauth.Server - mediaHandler media.Handler - storage blob.Storage - db db.DB + fromFederator chan gtsmodel.FromFederator + federator federation.Federator + stop chan interface{} + log *logrus.Logger + config *config.Config + tc typeutils.TypeConverter + oauthServer oauth.Server + mediaHandler media.Handler + storage blob.Storage + timelineManager timeline.Manager + db db.DB } // NewProcessor returns a new Processor that uses the given federator and logger -func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage blob.Storage, db db.DB, log *logrus.Logger) Processor { +func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage blob.Storage, timelineManager timeline.Manager, db db.DB, log *logrus.Logger) Processor { return &processor{ // toClientAPI: make(chan gtsmodel.ToClientAPI, 100), fromClientAPI: make(chan gtsmodel.FromClientAPI, 100), // toFederator: make(chan gtsmodel.ToFederator, 100), - fromFederator: make(chan gtsmodel.FromFederator, 100), - federator: federator, - stop: make(chan interface{}), - log: log, - config: config, - tc: tc, - oauthServer: oauthServer, - mediaHandler: mediaHandler, - storage: storage, - db: db, + fromFederator: make(chan gtsmodel.FromFederator, 100), + federator: federator, + stop: make(chan interface{}), + log: log, + config: config, + tc: tc, + oauthServer: oauthServer, + mediaHandler: mediaHandler, + storage: storage, + timelineManager: timelineManager, + db: db, } } @@ -250,7 +253,7 @@ func (p *processor) Start() error { } } }() - return nil + return p.initTimelines() } // Stop stops the processor cleanly, finishing handling any remaining messages before closing down. diff --git a/internal/processing/timeline.go b/internal/processing/timeline.go index 7de2d63..b968156 100644 --- a/internal/processing/timeline.go +++ b/internal/processing/timeline.go @@ -20,28 +20,26 @@ package processing import ( "fmt" + "sync" + "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/oauth" ) -func (p *processor) HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) { - statuses, err := p.db.GetHomeTimelineForAccount(authed.Account.ID, maxID, sinceID, minID, limit, local) +func (p *processor) HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) { + + statuses, err := p.timelineManager.HomeTimeline(authed.Account.ID, maxID, sinceID, minID, limit, local) if err != nil { return nil, NewErrorInternalError(err) } - s, err := p.filterStatuses(authed, statuses) - if err != nil { - return nil, NewErrorInternalError(err) - } - - return s, nil + return statuses, nil } -func (p *processor) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) { +func (p *processor) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) { statuses, err := p.db.GetPublicTimelineForAccount(authed.Account.ID, maxID, sinceID, minID, limit, local) if err != nil { return nil, NewErrorInternalError(err) @@ -55,10 +53,10 @@ func (p *processor) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID return s, nil } -func (p *processor) filterStatuses(authed *oauth.Auth, statuses []*gtsmodel.Status) ([]apimodel.Status, error) { +func (p *processor) filterStatuses(authed *oauth.Auth, statuses []*gtsmodel.Status) ([]*apimodel.Status, error) { l := p.log.WithField("func", "filterStatuses") - apiStatuses := []apimodel.Status{} + apiStatuses := []*apimodel.Status{} for _, s := range statuses { targetAccount := >smodel.Account{} if err := p.db.GetByID(s.AccountID, targetAccount); err != nil { @@ -115,8 +113,111 @@ func (p *processor) filterStatuses(authed *oauth.Auth, statuses []*gtsmodel.Stat continue } - apiStatuses = append(apiStatuses, *apiStatus) + apiStatuses = append(apiStatuses, apiStatus) } return apiStatuses, nil } + +func (p *processor) initTimelines() error { + // get all local accounts (ie., domain = nil) that aren't suspended (suspended_at = nil) + localAccounts := []*gtsmodel.Account{} + where := []db.Where{ + { + Key: "domain", Value: nil, + }, + { + Key: "suspended_at", Value: nil, + }, + } + if err := p.db.GetWhere(where, &localAccounts); err != nil { + if _, ok := err.(db.ErrNoEntries); ok { + return nil + } + return fmt.Errorf("initTimelines: db error initializing timelines: %s", err) + } + + // we want to wait until all timelines are populated so created a waitgroup here + wg := &sync.WaitGroup{} + wg.Add(len(localAccounts)) + + for _, localAccount := range localAccounts { + // to save time we can populate the timelines asynchronously + // this will go heavy on the database, but since we're not actually serving yet it doesn't really matter + go p.initTimelineFor(localAccount, wg) + } + + // wait for all timelines to be populated before we exit + wg.Wait() + return nil +} + +func (p *processor) initTimelineFor(account *gtsmodel.Account, wg *sync.WaitGroup) { + defer wg.Done() + + l := p.log.WithFields(logrus.Fields{ + "func": "initTimelineFor", + "accountID": account.ID, + }) + + desiredIndexLength := p.timelineManager.GetDesiredIndexLength() + + statuses, err := p.db.GetStatusesWhereFollowing(account.ID, desiredIndexLength, "") + if err != nil { + l.Error(fmt.Errorf("initTimelineFor: error getting statuses: %s", err)) + return + } + p.indexAndIngest(statuses, account, desiredIndexLength) + + lengthNow := p.timelineManager.GetIndexedLength(account.ID) + if lengthNow < desiredIndexLength { + // try and get more posts from the last ID onwards + rearmostStatusID, err := p.timelineManager.GetOldestIndexedID(account.ID) + if err != nil { + l.Error(fmt.Errorf("initTimelineFor: error getting id of rearmost status: %s", err)) + return + } + + if rearmostStatusID != "" { + moreStatuses, err := p.db.GetStatusesWhereFollowing(account.ID, desiredIndexLength / 2, rearmostStatusID) + if err != nil { + l.Error(fmt.Errorf("initTimelineFor: error getting more statuses: %s", err)) + return + } + p.indexAndIngest(moreStatuses, account, desiredIndexLength) + } + } + + l.Debugf("prepared timeline of length %d for account %s", lengthNow, account.ID) +} + +func (p *processor) indexAndIngest(statuses []*gtsmodel.Status, timelineAccount *gtsmodel.Account, desiredIndexLength int) { + l := p.log.WithFields(logrus.Fields{ + "func": "indexAndIngest", + "accountID": timelineAccount.ID, + }) + + for _, s := range statuses { + relevantAccounts, err := p.db.PullRelevantAccountsFromStatus(s) + if err != nil { + l.Error(fmt.Errorf("initTimelineFor: error getting relevant accounts from status %s: %s", s.ID, err)) + continue + } + visible, err := p.db.StatusVisible(s, relevantAccounts.StatusAuthor, timelineAccount, relevantAccounts) + if err != nil { + l.Error(fmt.Errorf("initTimelineFor: error checking visibility of status %s: %s", s.ID, err)) + continue + } + if visible { + if err := p.timelineManager.Ingest(s, timelineAccount.ID); err != nil { + l.Error(fmt.Errorf("initTimelineFor: error ingesting status %s: %s", s.ID, err)) + continue + } + + // check if we have enough posts now and return if we do + if p.timelineManager.GetIndexedLength(timelineAccount.ID) >= desiredIndexLength { + return + } + } + } +} diff --git a/internal/processing/timeline/manager.go b/internal/processing/timeline/manager.go index e8a8262..6758047 100644 --- a/internal/processing/timeline/manager.go +++ b/internal/processing/timeline/manager.go @@ -19,35 +19,158 @@ package timeline import ( + "sync" + + "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +const ( + preparedPostsMinLength = 80 + desiredPostIndexLength = 400 ) type Manager interface { - Ingest(status *gtsmodel.Status) error - HomeTimelineGet(account *gtsmodel.Account, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, error) + Ingest(status *gtsmodel.Status, timelineAccountID string) error + HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) + GetIndexedLength(timelineAccountID string) int + GetDesiredIndexLength() int + GetOldestIndexedID(timelineAccountID string) (string, error) + PrepareXFromTop(timelineAccountID string, limit int) error } -func NewManager(db db.DB, config *config.Config) Manager { +func NewManager(db db.DB, tc typeutils.TypeConverter, config *config.Config, log *logrus.Logger) Manager { return &manager{ - accountTimelines: make(map[string]*timeline), + accountTimelines: sync.Map{}, db: db, + tc: tc, config: config, + log: log, } } type manager struct { - accountTimelines map[string]*timeline + accountTimelines sync.Map db db.DB + tc typeutils.TypeConverter config *config.Config + log *logrus.Logger } -func (m *manager) Ingest(status *gtsmodel.Status) error { - return nil +func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) error { + l := m.log.WithFields(logrus.Fields{ + "func": "Ingest", + "timelineAccountID": timelineAccountID, + "statusID": status.ID, + }) + + var t Timeline + i, ok := m.accountTimelines.Load(timelineAccountID) + if !ok { + t = NewTimeline(timelineAccountID, m.db, m.tc) + m.accountTimelines.Store(timelineAccountID, t) + } else { + t = i.(Timeline) + } + + l.Trace("ingesting status") + + return t.IndexOne(status.CreatedAt, status.ID) } -func (m *manager) HomeTimelineGet(account *gtsmodel.Account, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, error) { - return nil, nil +func (m *manager) Remove(statusID string, timelineAccountID string) error { + l := m.log.WithFields(logrus.Fields{ + "func": "Remove", + "timelineAccountID": timelineAccountID, + "statusID": statusID, + }) + + var t Timeline + i, ok := m.accountTimelines.Load(timelineAccountID) + if !ok { + t = NewTimeline(timelineAccountID, m.db, m.tc) + m.accountTimelines.Store(timelineAccountID, t) + } else { + t = i.(Timeline) + } + + l.Trace("removing status") + + return t.Remove(statusID) +} + +func (m *manager) HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) { + l := m.log.WithFields(logrus.Fields{ + "func": "HomeTimelineGet", + "timelineAccountID": timelineAccountID, + }) + + var t Timeline + i, ok := m.accountTimelines.Load(timelineAccountID) + if !ok { + t = NewTimeline(timelineAccountID, m.db, m.tc) + m.accountTimelines.Store(timelineAccountID, t) + } else { + t = i.(Timeline) + } + + var err error + var statuses []*apimodel.Status + if maxID != "" { + statuses, err = t.GetXBehindID(limit, maxID) + } else { + statuses, err = t.GetXFromTop(limit) + } + + if err != nil { + l.Errorf("error getting statuses: %s", err) + } + return statuses, nil +} + +func (m *manager) GetIndexedLength(timelineAccountID string) int { + var t Timeline + i, ok := m.accountTimelines.Load(timelineAccountID) + if !ok { + t = NewTimeline(timelineAccountID, m.db, m.tc) + m.accountTimelines.Store(timelineAccountID, t) + } else { + t = i.(Timeline) + } + + return t.PostIndexLength() +} + +func (m *manager) GetDesiredIndexLength() int { + return desiredPostIndexLength +} + +func (m *manager) GetOldestIndexedID(timelineAccountID string) (string, error) { + var t Timeline + i, ok := m.accountTimelines.Load(timelineAccountID) + if !ok { + t = NewTimeline(timelineAccountID, m.db, m.tc) + m.accountTimelines.Store(timelineAccountID, t) + } else { + t = i.(Timeline) + } + + return t.OldestIndexedPostID() +} + +func (m *manager) PrepareXFromTop(timelineAccountID string, limit int) error { + var t Timeline + i, ok := m.accountTimelines.Load(timelineAccountID) + if !ok { + t = NewTimeline(timelineAccountID, m.db, m.tc) + m.accountTimelines.Store(timelineAccountID, t) + } else { + t = i.(Timeline) + } + + return t.PrepareXFromTop(limit) } diff --git a/internal/processing/timeline/post.go b/internal/processing/timeline/post.go deleted file mode 100644 index eebe9a6..0000000 --- a/internal/processing/timeline/post.go +++ /dev/null @@ -1 +0,0 @@ -package timeline \ No newline at end of file diff --git a/internal/processing/timeline/postindex.go b/internal/processing/timeline/postindex.go new file mode 100644 index 0000000..2766bfc --- /dev/null +++ b/internal/processing/timeline/postindex.go @@ -0,0 +1,47 @@ +package timeline + +import ( + "container/list" + "errors" + "time" +) + +type postIndex struct { + data *list.List +} + +type postIndexEntry struct { + createdAt time.Time + statusID string +} + +func (p *postIndex) index(i *postIndexEntry) error { + + if p.data == nil { + p.data = &list.List{} + } + + // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front + if p.data.Len() == 0 { + p.data.PushFront(i) + return nil + } + + // we need to iterate through the index to make sure we put this post in the appropriate place according to when it was created + for e := p.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*postIndexEntry) + if !ok { + return errors.New("Remove: could not parse e as a postIndexEntry") + } + + // if the post to index is newer than e, insert it before e in the list + if i.createdAt.After(entry.createdAt) { + p.data.InsertBefore(i, e) + return nil + } + } + + // if we reach this point it's the oldest post we've seen so put it at the back + p.data.PushBack(i) + return nil +} diff --git a/internal/processing/timeline/preparedposts.go b/internal/processing/timeline/preparedposts.go new file mode 100644 index 0000000..50a85a2 --- /dev/null +++ b/internal/processing/timeline/preparedposts.go @@ -0,0 +1,49 @@ +package timeline + +import ( + "container/list" + "errors" + "time" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +) + +type preparedPosts struct { + data *list.List +} + +type preparedPostsEntry struct { + createdAt time.Time + statusID string + prepared *apimodel.Status +} + +func (p *preparedPosts) insertPrepared(i *preparedPostsEntry) error { + if p.data == nil { + p.data = &list.List{} + } + + // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front + if p.data.Len() == 0 { + p.data.PushFront(i) + return nil + } + + // we need to iterate through the index to make sure we put this post in the appropriate place according to when it was created + for e := p.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*preparedPostsEntry) + if !ok { + return errors.New("index: could not parse e as a preparedPostsEntry") + } + + // if the post to index is newer than e, insert it before e in the list + if i.createdAt.After(entry.createdAt) { + p.data.InsertBefore(i, e) + return nil + } + } + + // if we reach this point it's the oldest post we've seen so put it at the back + p.data.PushBack(i) + return nil +} diff --git a/internal/processing/timeline/sharedcache.go b/internal/processing/timeline/sharedcache.go deleted file mode 100644 index 7627e8d..0000000 --- a/internal/processing/timeline/sharedcache.go +++ /dev/null @@ -1,56 +0,0 @@ -package timeline - -import ( - "sort" - "sync" -) - -type sharedCache struct { - data map[string]*post - maxLength int - *sync.Mutex -} - -func newSharedCache(maxLength int) *sharedCache { - return &sharedCache{ - data: make(map[string]*post), - maxLength: maxLength, - } -} - -func (s *sharedCache) shrink() { - // check if the length is longer than max size - toRemove := len(s.data) - s.maxLength - - if toRemove > 0 { - // we have stuff to remove so lock the map while we work - s.Lock() - defer s.Unlock() - - // we need to time-sort the map to remove the oldest entries - // the below code gives us a slice of keys, arranged from newest to oldest - postSlice := make([]*post, 0, len(s.data)) - for _, v := range s.data { - postSlice = append(postSlice, v) - } - sort.Slice(postSlice, func(i int, j int) bool { - return postSlice[i].createdAt.After(postSlice[j].createdAt) - }) - - // now for each entry we have to remove, delete the entry from the map by its status ID - for i := 0; i < toRemove; i = i + 1 { - statusID := postSlice[i].statusID - delete(s.data, statusID) - } - } -} - -func (s *sharedCache) put(post *post) { - s.Lock() - defer s.Unlock() - s.data[post.statusID] = post -} - -func (s *sharedCache) get(statusID string) *post { - return s.data[statusID] -} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 88e09f8..19f55db 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -27,151 +27,351 @@ import ( apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" ) const ( fromLatest = "FROM_LATEST" - postIndexMinLength = 200 - postIndexMaxLength = 400 - preparedPostsMaxLength = 400 - preparedPostsMinLength = 80 + preparedPostsMaxLength = desiredPostIndexLength ) -type timeline struct { - // - postIndex *list.List - preparedPosts *list.List - accountID string - db db.DB - *sync.Mutex +type Timeline interface { + // GetXFromTop returns x amount of posts from the top of the timeline, from newest to oldest. + GetXFromTop(amount int) ([]*apimodel.Status, error) + // GetXFromTop returns x amount of posts from the given id onwards, from newest to oldest. + GetXBehindID(amount int, fromID string) ([]*apimodel.Status, error) + + // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property. + IndexOne(statusCreatedAt time.Time, statusID string) error + // IndexMany instructs the timeline to index all the given posts. + IndexMany([]*apimodel.Status) error + // Remove removes a status from the timeline. + Remove(statusID string) error + // OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong. + // If nothing goes wrong but there's no oldest post, an empty string will be returned so make sure to check for this. + OldestIndexedPostID() (string, error) + + // PrepareXFromTop instructs the timeline to prepare x amount of posts from the top of the timeline. + PrepareXFromTop(amount int) error + // PrepareXFromIndex instrucst the timeline to prepare the next amount of entries for serialization, from index onwards. + PrepareXFromIndex(amount int, index int) error + + // ActualPostIndexLength returns the actual length of the post index at this point in time. + PostIndexLength() int + + // Reset instructs the timeline to reset to its base state -- cache only the minimum amount of posts. + Reset() error } -func newTimeline(accountID string, db db.DB, sharedCache *list.List) *timeline { +type timeline struct { + postIndex *postIndex + preparedPosts *preparedPosts + accountID string + account *gtsmodel.Account + db db.DB + tc typeutils.TypeConverter + sync.Mutex +} + +func NewTimeline(accountID string, db db.DB, typeConverter typeutils.TypeConverter) Timeline { return &timeline{ - postIndex: list.New(), - preparedPosts: list.New(), + postIndex: &postIndex{}, + preparedPosts: &preparedPosts{}, accountID: accountID, db: db, + tc: typeConverter, } } -func (t *timeline) prepareNextXFromID(amount int, fromID string) error { +func (t *timeline) PrepareXFromIndex(amount int, index int) error { t.Lock() defer t.Unlock() - prepared := make([]*post, 0, amount) - - // find the mark in the index -- we want x statuses after this - var fromMark *list.Element - for e := t.postIndex.Front(); e != nil; e = e.Next() { - p, ok := e.Value.(*post) + var indexed int + var prepared int + var preparing bool + for e := t.postIndex.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*postIndexEntry) if !ok { - return errors.New("could not convert interface to post") + return errors.New("PrepareXFromTop: could not parse e as a postIndexEntry") } - if p.statusID == fromID { - fromMark = e + if !preparing { + // we haven't hit the index we need to prepare from yet + if indexed == index { + preparing = true + } + indexed = indexed + 1 + continue + } else { + if err := t.prepare(entry.statusID); err != nil { + return fmt.Errorf("PrepareXFromTop: error preparing status with id %s: %s", entry.statusID, err) + } + prepared = prepared + 1 + if prepared >= amount { + // we're done + fmt.Printf("\n\n\nprepared %d entries\n\n\n", prepared) + break + } + } + } + + return nil +} + +func (t *timeline) PrepareXFromTop(amount int) error { + t.Lock() + defer t.Unlock() + + t.preparedPosts.data.Init() + + var prepared int + for e := t.postIndex.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*postIndexEntry) + if !ok { + return errors.New("PrepareXFromTop: could not parse e as a postIndexEntry") + } + + if err := t.prepare(entry.statusID); err != nil { + return fmt.Errorf("PrepareXFromTop: error preparing status with id %s: %s", entry.statusID, err) + } + + prepared = prepared + 1 + if prepared >= amount { + // we're done break } } - if fromMark == nil { - // we can't find the given id in the index -_- - return fmt.Errorf("prepareNextXFromID: fromID %s not found in index", fromID) - } - - for e := fromMark.Next(); e != nil; e = e.Next() { - - } - return nil } -func (t *timeline) getXFromTop(amount int) ([]*apimodel.Status, error) { - statuses := []*apimodel.Status{} - if amount == 0 { - return statuses, nil +func (t *timeline) GetXFromTop(amount int) ([]*apimodel.Status, error) { + // make a slice of statuses with the length we need to return + statuses := make([]*apimodel.Status, 0, amount) + + // if there are no prepared posts, just return the empty slice + if t.preparedPosts.data == nil { + t.preparedPosts.data = &list.List{} } - if len(t.readyToGo) < amount { - if err := t.prepareNextXFromID(amount, fromLatest); err != nil { + // make sure we have enough posts prepared to return + if t.preparedPosts.data.Len() < amount { + if err := t.PrepareXFromTop(amount); err != nil { return nil, err } } - return t.readyToGo[:amount], nil -} - -// getXFromID gets x amount of posts in chronological order from the given ID onwards, NOT including the given id. -// The posts will be taken from the preparedPosts pile, unless nothing is ready to go. -func (t *timeline) getXFromID(amount int, fromID string) ([]*apimodel.Status, error) { - statuses := []*apimodel.Status{} - if amount == 0 || fromID == "" { - return statuses, nil - } - - // get the position of the given id in the ready to go pile - var indexOfID *int - for i, s := range t.readyToGo { - if s.ID == fromID { - indexOfID = &i + // work through the prepared posts from the top and return + var served int + for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*preparedPostsEntry) + if !ok { + return nil, errors.New("GetXFromTop: could not parse e as a preparedPostsEntry") + } + statuses = append(statuses, entry.prepared) + served = served + 1 + if served >= amount { + break } } - // the status isn't in the ready to go pile so prepare it - if indexOfID == nil { - if err := t.prepareNextXFromID(amount, fromID); err != nil { + return statuses, nil +} + +func (t *timeline) GetXBehindID(amount int, fromID string) ([]*apimodel.Status, error) { + // make a slice of statuses with the length we need to return + statuses := make([]*apimodel.Status, 0, amount) + + // if there are no prepared posts, just return the empty slice + if t.preparedPosts.data == nil { + t.preparedPosts.data = &list.List{} + } + + // find the position of id + var position int + for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*preparedPostsEntry) + if !ok { + return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") + } + if entry.statusID == fromID { + fmt.Printf("\n\n\nfromid %s is at position %d\n\n\n", fromID, position) + break + } + position = position + 1 + } + + // make sure we have enough posts prepared to return + if t.preparedPosts.data.Len() < amount+position { + + if err := t.PrepareXFromIndex(amount, position); err != nil { return nil, err } } - return nil, nil + // iterate through the modified list until we hit the fromID again + var serving bool + var served int + for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*preparedPostsEntry) + if !ok { + return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") + } + + if !serving { + // we're not serving yet but we might on the next time round if we hit our from id + if entry.statusID == fromID { + fmt.Printf("\n\n\nwe've hit fromid %s at position %d, will now serve\n\n\n", fromID, position) + serving = true + continue + } + } else { + // we're serving now! + statuses = append(statuses, entry.prepared) + served = served + 1 + if served >= amount { + break + } + } + } + + return statuses, nil } -func (t *timeline) insert(status *apimodel.Status) error { +func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error { t.Lock() defer t.Unlock() - createdAt, err := time.Parse(time.RFC3339, status.CreatedAt) - if err != nil { - return fmt.Errorf("insert: could not parse time %s: %s", status.CreatedAt, err) + postIndexEntry := &postIndexEntry{ + createdAt: statusCreatedAt, + statusID: statusID, } + return t.postIndex.index(postIndexEntry) +} - newPost := &post{ - createdAt: createdAt, - statusID: status.ID, - serialized: status, - } +func (t *timeline) Remove(statusID string) error { + t.Lock() + defer t.Unlock() - if t.index == nil { - t.index.PushFront(newPost) - } - - for e := t.index.Front(); e != nil; e = e.Next() { - p, ok := e.Value.(*post) + // remove the entry from the post index + for e := t.postIndex.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*postIndexEntry) if !ok { - return errors.New("could not convert interface to post") + return errors.New("Remove: could not parse e as a postIndexEntry") } - - if newPost.createdAt.After(p.createdAt) { - // this is a newer post so insert it just before the post it's newer than - t.index.InsertBefore(newPost, e) - return nil + if entry.statusID == statusID { + t.postIndex.data.Remove(e) + break // bail once we found it + } + } + + // remove the entry from prepared posts + for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*preparedPostsEntry) + if !ok { + return errors.New("Remove: could not parse e as a preparedPostsEntry") + } + if entry.statusID == statusID { + t.preparedPosts.data.Remove(e) + break // bail once we found it } } - // if we haven't returned yet it's the oldest post we've seen so shove it at the back - t.index.PushBack(newPost) return nil } -type preparedPostsEntry struct { - createdAt time.Time - statusID string - serialized *apimodel.Status +func (t *timeline) IndexMany(statuses []*apimodel.Status) error { + t.Lock() + defer t.Unlock() + + // add statuses to the index + for _, s := range statuses { + createdAt, err := time.Parse(s.CreatedAt, time.RFC3339) + if err != nil { + return fmt.Errorf("IndexMany: could not parse time %s on status id %s: %s", s.CreatedAt, s.ID, err) + } + postIndexEntry := &postIndexEntry{ + createdAt: createdAt, + statusID: s.ID, + } + if err := t.postIndex.index(postIndexEntry); err != nil { + return err + } + } + return nil } -type postIndexEntry struct { - createdAt time.Time - statusID string +func (t *timeline) Reset() error { + return nil +} + +func (t *timeline) PostIndexLength() int { + if t.postIndex == nil || t.postIndex.data == nil { + return 0 + } + + return t.postIndex.data.Len() +} + +func (t *timeline) OldestIndexedPostID() (string, error) { + var id string + if t.postIndex == nil || t.postIndex.data == nil { + return id, nil + } + e := t.postIndex.data.Back() + + if e == nil { + return id, nil + } + + entry, ok := e.Value.(*postIndexEntry) + if !ok { + return id, errors.New("OldestIndexedPostID: could not parse e as a postIndexEntry") + } + + return entry.statusID, nil +} + +func (t *timeline) prepare(statusID string) error { + gtsStatus := >smodel.Status{} + if err := t.db.GetByID(statusID, gtsStatus); err != nil { + return err + } + + if t.account == nil { + timelineOwnerAccount := >smodel.Account{} + if err := t.db.GetByID(t.accountID, timelineOwnerAccount); err != nil { + return err + } + t.account = timelineOwnerAccount + } + + relevantAccounts, err := t.db.PullRelevantAccountsFromStatus(gtsStatus) + if err != nil { + return err + } + + var reblogOfStatus *gtsmodel.Status + if gtsStatus.BoostOfID != "" { + s := >smodel.Status{} + if err := t.db.GetByID(gtsStatus.BoostOfID, s); err != nil { + return err + } + reblogOfStatus = s + } + + apiModelStatus, err := t.tc.StatusToMasto(gtsStatus, relevantAccounts.StatusAuthor, t.account, relevantAccounts.BoostedAccount, relevantAccounts.ReplyToAccount, reblogOfStatus) + if err != nil { + return err + } + + preparedPostsEntry := &preparedPostsEntry{ + createdAt: gtsStatus.CreatedAt, + statusID: statusID, + prepared: apiModelStatus, + } + + return t.preparedPosts.insertPrepared(preparedPostsEntry) } diff --git a/testrig/processor.go b/testrig/processor.go index d50748f..6f9a2a4 100644 --- a/testrig/processor.go +++ b/testrig/processor.go @@ -27,5 +27,5 @@ import ( // NewTestProcessor returns a Processor suitable for testing purposes func NewTestProcessor(db db.DB, storage blob.Storage, federator federation.Federator) processing.Processor { - return processing.NewProcessor(NewTestConfig(), NewTestTypeConverter(db), federator, NewTestOauthServer(db), NewTestMediaHandler(db, storage), storage, db, NewTestLog()) + return processing.NewProcessor(NewTestConfig(), NewTestTypeConverter(db), federator, NewTestOauthServer(db), NewTestMediaHandler(db, storage), storage, NewTestTimelineManager(db), db, NewTestLog()) } diff --git a/testrig/timelinemanager.go b/testrig/timelinemanager.go new file mode 100644 index 0000000..2dd2514 --- /dev/null +++ b/testrig/timelinemanager.go @@ -0,0 +1,10 @@ +package testrig + +import ( + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" +) + +func NewTestTimelineManager(db db.DB) timeline.Manager { + return timeline.NewManager(db, NewTestTypeConverter(db), NewTestConfig(), NewTestLog()) +}