From d26c6896961e3af3ad0d173b6d16b1dd815a25e4 Mon Sep 17 00:00:00 2001 From: tsmethurst Date: Tue, 1 Jun 2021 20:09:28 +0200 Subject: [PATCH] i have no idea what i'm doing --- internal/processing/timeline/sharedcache.go | 56 ++++++++++ internal/processing/timeline/sharedpool.go | 46 -------- internal/processing/timeline/timeline.go | 113 +++++++++++++++++--- 3 files changed, 152 insertions(+), 63 deletions(-) create mode 100644 internal/processing/timeline/sharedcache.go delete mode 100644 internal/processing/timeline/sharedpool.go diff --git a/internal/processing/timeline/sharedcache.go b/internal/processing/timeline/sharedcache.go new file mode 100644 index 0000000..7627e8d --- /dev/null +++ b/internal/processing/timeline/sharedcache.go @@ -0,0 +1,56 @@ +package timeline + +import ( + "sort" + "sync" +) + +type sharedCache struct { + data map[string]*post + maxLength int + *sync.Mutex +} + +func newSharedCache(maxLength int) *sharedCache { + return &sharedCache{ + data: make(map[string]*post), + maxLength: maxLength, + } +} + +func (s *sharedCache) shrink() { + // check if the length is longer than max size + toRemove := len(s.data) - s.maxLength + + if toRemove > 0 { + // we have stuff to remove so lock the map while we work + s.Lock() + defer s.Unlock() + + // we need to time-sort the map to remove the oldest entries + // the below code gives us a slice of keys, arranged from newest to oldest + postSlice := make([]*post, 0, len(s.data)) + for _, v := range s.data { + postSlice = append(postSlice, v) + } + sort.Slice(postSlice, func(i int, j int) bool { + return postSlice[i].createdAt.After(postSlice[j].createdAt) + }) + + // now for each entry we have to remove, delete the entry from the map by its status ID + for i := 0; i < toRemove; i = i + 1 { + statusID := postSlice[i].statusID + delete(s.data, statusID) + } + } +} + +func (s *sharedCache) put(post *post) { + s.Lock() + defer s.Unlock() + s.data[post.statusID] = post +} + +func (s *sharedCache) get(statusID string) *post { + return s.data[statusID] +} diff --git a/internal/processing/timeline/sharedpool.go b/internal/processing/timeline/sharedpool.go deleted file mode 100644 index 36cdf53..0000000 --- a/internal/processing/timeline/sharedpool.go +++ /dev/null @@ -1,46 +0,0 @@ -package timeline - -import ( - "sync" - "time" -) - -type sharedCache struct { - data map[string]*post - maxLength int - *sync.Mutex -} - -func newSharedCache(maxLength int) *sharedCache { - return &sharedCache{ - data: make(map[string]*post), - maxLength: maxLength, - } -} - -func (s *sharedCache) shrink() { - toRemove := len(s.data) - s.maxLength - if toRemove > 0 { - s.Lock() - defer s.Unlock() - oldest := time.Now() - oldestIDs := make([]string, toRemove) - for id, post := range s.data { - if post.createdAt.Before(oldest) { - oldest = post.createdAt - oldestIDs = append(oldestIDs, id) - } - } - - } -} - -func (s *sharedCache) put(post *post) { - s.Lock() - defer s.Unlock() - s.data[post.statusID] = post -} - -func (s *sharedCache) get(statusID string) *post { - return s.data[statusID] -} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 10d6d3c..00fff1a 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -21,6 +21,7 @@ package timeline import ( "container/list" "errors" + "fmt" "sync" "time" @@ -28,33 +29,96 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/db" ) +const fromLatest = "FROM_LATEST" + type timeline struct { - index *list.List - readyToGo []*apimodel.Status - sharedCache *list.List - accountID string - db db.DB + postIndex *list.List + preparedPosts *list.List + sharedCache *list.List + accountID string + db db.DB *sync.Mutex } -func newTimeline(accountID string, db db.DB) *timeline { +func newTimeline(accountID string, db db.DB, sharedCache *list.List) *timeline { return &timeline{ - index: list.New(), - readyToGo: []*apimodel.Status{}, - accountID: accountID, + postIndex: list.New(), + preparedPosts: list.New(), + sharedCache: sharedCache, + accountID: accountID, } } -func (t *timeline) prepareXFromID(limit int, statusID string) error { +func (t *timeline) prepareNextXFromID(amount int, fromID string) error { t.Lock() defer t.Unlock() + prepared := make([]*post, 0, amount) + + // find the mark in the index -- we want x statuses after this + var fromMark *list.Element + for e := t.postIndex.Front(); e != nil; e = e.Next() { + p, ok := e.Value.(*post) + if !ok { + return errors.New("could not convert interface to post") + } + + if p.statusID == fromID { + fromMark = e + break + } + } + + if fromMark == nil { + // we can't find the given id in the index -_- + return fmt.Errorf("prepareNextXFromID: fromID %s not found in index", fromID) + } + + for e := fromMark.Next(); e != nil; e = e.Next() { + + } + return nil } -func (t *timeline) getX(limit int) (*apimodel.Status, error) { - t.Lock() - defer t.Unlock() +func (t *timeline) getXFromTop(amount int) ([]*apimodel.Status, error) { + statuses := []*apimodel.Status{} + if amount == 0 { + return statuses, nil + } + + if len(t.readyToGo) < amount { + if err := t.prepareNextXFromID(amount, fromLatest); err != nil { + return nil, err + } + } + + return t.readyToGo[:amount], nil +} + +// getXFromID gets x amount of posts in chronological order from the given ID onwards, NOT including the given id. +// The posts will be taken from the readyToGo pile, unless nothing is ready to go. +func (t *timeline) getXFromID(amount int, fromID string) ([]*apimodel.Status, error) { + statuses := []*apimodel.Status{} + if amount == 0 || fromID == "" { + return statuses, nil + } + + // get the position of the given id in the ready to go pile + var indexOfID *int + for i, s := range t.readyToGo { + if s.ID == fromID { + indexOfID = &i + } + } + + // the status isn't in the ready to go pile so prepare it + if indexOfID == nil { + if err := t.prepareNextXFromID(amount, fromID); err != nil { + return nil, err + } + } + return nil, nil } @@ -62,7 +126,16 @@ func (t *timeline) insert(status *apimodel.Status) error { t.Lock() defer t.Unlock() - newPost := &post{} + createdAt, err := time.Parse(time.RFC3339, status.CreatedAt) + if err != nil { + return fmt.Errorf("insert: could not parse time %s: %s", status.CreatedAt, err) + } + + newPost := &post{ + createdAt: createdAt, + statusID: status.ID, + serialized: status, + } if t.index == nil { t.index.PushFront(newPost) @@ -73,15 +146,21 @@ func (t *timeline) insert(status *apimodel.Status) error { if !ok { return errors.New("could not convert interface to post") } - if p.createdAt.Before(newPost.createdAt) { + if newPost.createdAt.After(p.createdAt) { + // this is a newer post so insert it just before the post it's newer than + t.index.InsertBefore(newPost, e) + return nil } } + + // if we haven't returned yet it's the oldest post we've seen so shove it at the back + t.index.PushBack(newPost) return nil } type post struct { - createdAt time.Time - statusID string + createdAt time.Time + statusID string serialized *apimodel.Status }