diff --git a/internal/db/pg/pg.go b/internal/db/pg/pg.go index 887df92..9daf94e 100644 --- a/internal/db/pg/pg.go +++ b/internal/db/pg/pg.go @@ -975,6 +975,7 @@ func (ps *postgresService) GetPublicTimelineForAccount(accountID string, maxID s q := ps.conn.Model(&statuses). Where("visibility = ?", gtsmodel.VisibilityPublic). Where("? IS NULL", pg.Ident("in_reply_to_id")). + Where("? IS NULL", pg.Ident("in_reply_to_uri")). Where("? IS NULL", pg.Ident("boost_of_id")). Order("status.id DESC") diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index 719d72e..8614f09 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -96,6 +96,16 @@ func (p *processor) notifyStatus(status *gtsmodel.Status) error { if err := p.db.Put(notif); err != nil { return fmt.Errorf("notifyStatus: error putting notification in database: %s", err) } + + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, m.GTSAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } } return nil @@ -123,6 +133,16 @@ func (p *processor) notifyFollowRequest(followRequest *gtsmodel.FollowRequest, r return fmt.Errorf("notifyFollowRequest: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -157,6 +177,16 @@ func (p *processor) notifyFollow(follow *gtsmodel.Follow, receivingAccount *gtsm return fmt.Errorf("notifyFollow: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -183,6 +213,16 @@ func (p *processor) notifyFave(fave *gtsmodel.StatusFave, receivingAccount *gtsm return fmt.Errorf("notifyFave: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -242,6 +282,16 @@ func (p *processor) notifyAnnounce(status *gtsmodel.Status) error { return fmt.Errorf("notifyAnnounce: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, boostedAcct); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -321,16 +371,22 @@ func (p *processor) timelineStatusForAccount(status *gtsmodel.Status, accountID return } - if err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID); err != nil { + // stick the status in the timeline for the account and then immediately prepare it so they can see it right away + inserted, err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID) + if err != nil { errors <- fmt.Errorf("timelineStatusForAccount: error ingesting status %s: %s", status.ID, err) + return } - mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount) - if err != nil { - errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) - } else { - if err := p.streamingProcessor.StreamStatusToAccount(mastoStatus, timelineAccount); err != nil { - errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) + // the status was inserted to stream it to the user + if inserted { + mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount) + if err != nil { + errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) + } else { + if err := p.streamingProcessor.StreamStatusToAccount(mastoStatus, timelineAccount); err != nil { + errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) + } } } } diff --git a/internal/processing/synchronous/streaming/streaming.go b/internal/processing/synchronous/streaming/streaming.go index 848da32..9fd6285 100644 --- a/internal/processing/synchronous/streaming/streaming.go +++ b/internal/processing/synchronous/streaming/streaming.go @@ -20,6 +20,7 @@ type Processor interface { AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error + StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error } type processor struct { diff --git a/internal/processing/synchronous/streaming/streamnotification.go b/internal/processing/synchronous/streaming/streamnotification.go new file mode 100644 index 0000000..24c8342 --- /dev/null +++ b/internal/processing/synchronous/streaming/streamnotification.go @@ -0,0 +1,50 @@ +package streaming + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { + l := p.log.WithFields(logrus.Fields{ + "func": "StreamNotificationToAccount", + "account": account.ID, + }) + v, ok := p.streamMap.Load(account.ID) + if !ok { + // no open connections so nothing to stream + return nil + } + + streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + if !ok { + return errors.New("stream map error") + } + + notificationBytes, err := json.Marshal(n) + if err != nil { + return fmt.Errorf("error marshalling notification to json: %s", err) + } + + streamsForAccount.Lock() + defer streamsForAccount.Unlock() + for _, stream := range streamsForAccount.Streams { + stream.Lock() + defer stream.Unlock() + if stream.Connected { + l.Debugf("streaming notification to stream id %s", stream.ID) + stream.Messages <- >smodel.Message{ + Stream: []string{stream.Type}, + Event: "notification", + Payload: string(notificationBytes), + } + } + } + + return nil +} diff --git a/internal/processing/timeline.go b/internal/processing/timeline.go index a8f42d6..8f6b1d2 100644 --- a/internal/processing/timeline.go +++ b/internal/processing/timeline.go @@ -201,7 +201,7 @@ func (p *processor) indexAndIngest(statuses []*gtsmodel.Status, timelineAccount continue } if timelineable { - if err := p.timelineManager.Ingest(s, timelineAccount.ID); err != nil { + if _, err := p.timelineManager.Ingest(s, timelineAccount.ID); err != nil { l.Error(fmt.Errorf("initTimelineFor: error ingesting status %s: %s", s.ID, err)) continue } diff --git a/internal/timeline/index.go b/internal/timeline/index.go index bc1bf99..8d0506a 100644 --- a/internal/timeline/index.go +++ b/internal/timeline/index.go @@ -40,7 +40,7 @@ grabloop: } for _, s := range filtered { - if err := t.IndexOne(s.CreatedAt, s.ID, s.BoostOfID); err != nil { + if _, err := t.IndexOne(s.CreatedAt, s.ID, s.BoostOfID); err != nil { return fmt.Errorf("IndexBehindAndIncluding: error indexing status with id %s: %s", s.ID, err) } } @@ -52,7 +52,7 @@ func (t *timeline) IndexOneByID(statusID string) error { return nil } -func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string, boostOfID string) error { +func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string, boostOfID string) (bool, error) { t.Lock() defer t.Unlock() @@ -64,7 +64,7 @@ func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string, boostOfI return t.postIndex.insertIndexed(postIndexEntry) } -func (t *timeline) IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) error { +func (t *timeline) IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) (bool, error) { t.Lock() defer t.Unlock() @@ -72,15 +72,18 @@ func (t *timeline) IndexAndPrepareOne(statusCreatedAt time.Time, statusID string statusID: statusID, } - if err := t.postIndex.insertIndexed(postIndexEntry); err != nil { - return fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err) + inserted, err := t.postIndex.insertIndexed(postIndexEntry) + if err != nil { + return inserted, fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err) } - if err := t.prepare(statusID); err != nil { - return fmt.Errorf("IndexAndPrepareOne: error preparing: %s", err) + if inserted { + if err := t.prepare(statusID); err != nil { + return inserted, fmt.Errorf("IndexAndPrepareOne: error preparing: %s", err) + } } - return nil + return inserted, nil } func (t *timeline) OldestIndexedPostID() (string, error) { diff --git a/internal/timeline/manager.go b/internal/timeline/manager.go index c389a6b..2770f9e 100644 --- a/internal/timeline/manager.go +++ b/internal/timeline/manager.go @@ -51,12 +51,18 @@ type Manager interface { // Ingest takes one status and indexes it into the timeline for the given account ID. // // It should already be established before calling this function that the status/post actually belongs in the timeline! - Ingest(status *gtsmodel.Status, timelineAccountID string) error + // + // The returned bool indicates whether the status was actually put in the timeline. This could be false in cases where + // the status is a boost, but a boost of the original post or the post itself already exists recently in the timeline. + Ingest(status *gtsmodel.Status, timelineAccountID string) (bool, error) // IngestAndPrepare takes one status and indexes it into the timeline for the given account ID, and then immediately prepares it for serving. // This is useful in cases where we know the status will need to be shown at the top of a user's timeline immediately (eg., a new status is created). // // It should already be established before calling this function that the status/post actually belongs in the timeline! - IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) error + // + // The returned bool indicates whether the status was actually put in the timeline. This could be false in cases where + // the status is a boost, but a boost of the original post or the post itself already exists recently in the timeline. + IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) (bool, error) // HomeTimeline returns limit n amount of entries from the home timeline of the given account ID, in descending chronological order. // If maxID is provided, it will return entries from that maxID onwards, inclusive. HomeTimeline(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) @@ -95,7 +101,7 @@ type manager struct { log *logrus.Logger } -func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) error { +func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) (bool, error) { l := m.log.WithFields(logrus.Fields{ "func": "Ingest", "timelineAccountID": timelineAccountID, @@ -108,7 +114,7 @@ func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) erro return t.IndexOne(status.CreatedAt, status.ID, status.BoostOfID) } -func (m *manager) IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) error { +func (m *manager) IngestAndPrepare(status *gtsmodel.Status, timelineAccountID string) (bool, error) { l := m.log.WithFields(logrus.Fields{ "func": "IngestAndPrepare", "timelineAccountID": timelineAccountID, diff --git a/internal/timeline/postindex.go b/internal/timeline/postindex.go index 7142035..44765bf 100644 --- a/internal/timeline/postindex.go +++ b/internal/timeline/postindex.go @@ -14,7 +14,7 @@ type postIndexEntry struct { boostOfID string } -func (p *postIndex) insertIndexed(i *postIndexEntry) error { +func (p *postIndex) insertIndexed(i *postIndexEntry) (bool, error) { if p.data == nil { p.data = &list.List{} } @@ -22,7 +22,7 @@ func (p *postIndex) insertIndexed(i *postIndexEntry) error { // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front if p.data.Len() == 0 { p.data.PushFront(i) - return nil + return true, nil } var insertMark *list.Element @@ -34,14 +34,14 @@ func (p *postIndex) insertIndexed(i *postIndexEntry) error { entry, ok := e.Value.(*postIndexEntry) if !ok { - return errors.New("index: could not parse e as a postIndexEntry") + return false, errors.New("index: could not parse e as a postIndexEntry") } // don't insert this if it's a boost of a status we've seen recently if i.boostOfID != "" { if i.boostOfID == entry.boostOfID || i.boostOfID == entry.statusID { if position < boostReinsertionDepth { - return nil + return false, nil } } } @@ -55,16 +55,16 @@ func (p *postIndex) insertIndexed(i *postIndexEntry) error { // make sure we don't insert a duplicate if entry.statusID == i.statusID { - return nil + return false, nil } } if insertMark != nil { p.data.InsertBefore(i, insertMark) - return nil + return true, nil } // if we reach this point it's the oldest post we've seen so put it at the back p.data.PushBack(i) - return nil + return true, nil } diff --git a/internal/timeline/timeline.go b/internal/timeline/timeline.go index 363c099..5e274b5 100644 --- a/internal/timeline/timeline.go +++ b/internal/timeline/timeline.go @@ -62,7 +62,10 @@ type Timeline interface { */ // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property. - IndexOne(statusCreatedAt time.Time, statusID string, boostOfID string) error + // + // The returned bool indicates whether or not the status was actually inserted into the timeline. This will be false + // if the status is a boost and the original post or another boost of it already exists < boostReinsertionDepth back in the timeline. + IndexOne(statusCreatedAt time.Time, statusID string, boostOfID string) (bool, error) // OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong. // If nothing goes wrong but there's no oldest post, an empty string will be returned so make sure to check for this. @@ -79,7 +82,10 @@ type Timeline interface { PrepareBehind(statusID string, amount int) error // IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property, // and then immediately prepares it. - IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) error + // + // The returned bool indicates whether or not the status was actually inserted into the timeline. This will be false + // if the status is a boost and the original post or another boost of it already exists < boostReinsertionDepth back in the timeline. + IndexAndPrepareOne(statusCreatedAt time.Time, statusID string) (bool, error) // OldestPreparedPostID returns the id of the rearmost (ie., the oldest) prepared post, or an error if something goes wrong. // If nothing goes wrong but there's no oldest post, an empty string will be returned so make sure to check for this. OldestPreparedPostID() (string, error)