remote boosts incoming now working

This commit is contained in:
tsmethurst 2021-05-27 22:26:33 +02:00
parent 40add68691
commit a47085d141
12 changed files with 412 additions and 13 deletions

View File

@ -0,0 +1,73 @@
package federatingdb
import (
"context"
"encoding/json"
"fmt"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
l := f.log.WithFields(
logrus.Fields{
"func": "Announce",
},
)
m, err := streams.Serialize(announce)
if err != nil {
return err
}
b, err := json.Marshal(m)
if err != nil {
return err
}
l.Debugf("received ANNOUNCE %s", string(b))
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
l.Error("target account wasn't set on context")
return nil
}
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
if !ok {
l.Error("target account was set on context but couldn't be parsed")
return nil
}
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
if fromFederatorChanI == nil {
l.Error("from federator channel wasn't set on context")
return nil
}
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
if !ok {
l.Error("from federator channel was set on context but couldn't be parsed")
return nil
}
boost, isNew, err := f.typeConverter.ASAnnounceToStatus(announce)
if err != nil {
return fmt.Errorf("Announce: error converting announce to boost: %s", err)
}
if !isNew {
// nothing to do here if this isn't a new announce
return nil
}
// it's a new announce so pass it back to the processor async for dereferencing etc
fromFederatorChan <- gtsmodel.FromFederator{
APObjectType: gtsmodel.ActivityStreamsAnnounce,
APActivityType: gtsmodel.ActivityStreamsCreate,
GTSModel: boost,
ReceivingAccount: targetAcct,
}
return nil
}

View File

@ -35,6 +35,7 @@ type DB interface {
pub.Database
Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error
Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error
Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error
}
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.

View File

@ -16,6 +16,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package federation
package federatingdb_test
// TODO: write tests for pgfed

View File

@ -257,6 +257,10 @@ func (f *federator) FederatingCallbacks(ctx context.Context) (wrapped pub.Federa
func(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
return f.FederatingDB().Accept(ctx, accept)
},
// override default announce behavior and trigger our own side effects
func(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
return f.FederatingDB().Announce(ctx, announce)
},
}
return

View File

@ -43,6 +43,9 @@ type Federator interface {
// DereferenceRemoteAccount can be used to get the representation of a remote account, based on the account ID (which is a URI).
// The given username will be used to create a transport for making outgoing requests. See the implementation for more detailed comments.
DereferenceRemoteAccount(username string, remoteAccountID *url.URL) (typeutils.Accountable, error)
// DereferenceRemoteStatus can be used to get the representation of a remote status, based on its ID (which is a URI).
// The given username will be used to create a transport for making outgoing requests. See the implementation for more detailed comments.
DereferenceRemoteStatus(username string, remoteStatusID *url.URL) (typeutils.Statusable, error)
// GetTransportForUser returns a new transport initialized with the key credentials belonging to the given username.
// This can be used for making signed http requests.
//

View File

@ -258,6 +258,88 @@ func (f *federator) DereferenceRemoteAccount(username string, remoteAccountID *u
return nil, fmt.Errorf("type name %s not supported", t.GetTypeName())
}
func (f *federator) DereferenceRemoteStatus(username string, remoteStatusID *url.URL) (typeutils.Statusable, error) {
transport, err := f.GetTransportForUser(username)
if err != nil {
return nil, fmt.Errorf("transport err: %s", err)
}
b, err := transport.Dereference(context.Background(), remoteStatusID)
if err != nil {
return nil, fmt.Errorf("error deferencing %s: %s", remoteStatusID.String(), err)
}
m := make(map[string]interface{})
if err := json.Unmarshal(b, &m); err != nil {
return nil, fmt.Errorf("error unmarshalling bytes into json: %s", err)
}
t, err := streams.ToType(context.Background(), m)
if err != nil {
return nil, fmt.Errorf("error resolving json into ap vocab type: %s", err)
}
// Article, Document, Image, Video, Note, Page, Event, Place, Mention, Profile
switch t.GetTypeName() {
case gtsmodel.ActivityStreamsArticle:
p, ok := t.(vocab.ActivityStreamsArticle)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsArticle")
}
return p, nil
case gtsmodel.ActivityStreamsDocument:
p, ok := t.(vocab.ActivityStreamsDocument)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsDocument")
}
return p, nil
case gtsmodel.ActivityStreamsImage:
p, ok := t.(vocab.ActivityStreamsImage)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsImage")
}
return p, nil
case gtsmodel.ActivityStreamsVideo:
p, ok := t.(vocab.ActivityStreamsVideo)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsVideo")
}
return p, nil
case gtsmodel.ActivityStreamsNote:
p, ok := t.(vocab.ActivityStreamsNote)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsNote")
}
return p, nil
case gtsmodel.ActivityStreamsPage:
p, ok := t.(vocab.ActivityStreamsPage)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsPage")
}
return p, nil
case gtsmodel.ActivityStreamsEvent:
p, ok := t.(vocab.ActivityStreamsEvent)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsEvent")
}
return p, nil
case gtsmodel.ActivityStreamsPlace:
p, ok := t.(vocab.ActivityStreamsPlace)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsPlace")
}
return p, nil
case gtsmodel.ActivityStreamsProfile:
p, ok := t.(vocab.ActivityStreamsProfile)
if !ok {
return nil, errors.New("error resolving type as ActivityStreamsProfile")
}
return p, nil
}
return nil, fmt.Errorf("type name %s not supported", t.GetTypeName())
}
func (f *federator) GetTransportForUser(username string) (transport.Transport, error) {
// We need an account to use to create a transport for dereferecing the signature.
// If a username has been given, we can fetch the account with that username and use it.
@ -279,5 +361,3 @@ func (f *federator) GetTransportForUser(username string) (transport.Transport, e
}
return transport, nil
}

View File

@ -24,7 +24,7 @@ const (
// ActivityStreamsAudio https://www.w3.org/TR/activitystreams-vocabulary/#dfn-audio
ActivityStreamsAudio = "Audio"
// ActivityStreamsDocument https://www.w3.org/TR/activitystreams-vocabulary/#dfn-document
ActivityStreamsDocument = "Event"
ActivityStreamsDocument = "Document"
// ActivityStreamsEvent https://www.w3.org/TR/activitystreams-vocabulary/#dfn-event
ActivityStreamsEvent = "Event"
// ActivityStreamsImage https://www.w3.org/TR/activitystreams-vocabulary/#dfn-image

View File

@ -158,3 +158,7 @@ func (p *processor) notifyFave(fave *gtsmodel.StatusFave, receivingAccount *gtsm
return nil
}
func (p *processor) notifyAnnounce(status *gtsmodel.Status) error {
return nil
}

View File

@ -27,7 +27,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/transport"
)
func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) error {
@ -50,7 +49,7 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
}
l.Debug("will now derefence incoming status")
if err := p.dereferenceStatusFields(incomingStatus); err != nil {
if err := p.dereferenceStatusFields(incomingStatus, federatorMsg.ReceivingAccount.Username); err != nil {
return fmt.Errorf("error dereferencing status from federator: %s", err)
}
if err := p.db.UpdateByID(incomingStatus.ID, incomingStatus); err != nil {
@ -94,6 +93,24 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
if err := p.notifyFollowRequest(incomingFollowRequest, federatorMsg.ReceivingAccount); err != nil {
return err
}
case gtsmodel.ActivityStreamsAnnounce:
// CREATE AN ANNOUNCE
incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
if !ok {
return errors.New("announce was not parseable as *gtsmodel.Status")
}
if err := p.dereferenceAnnounce(incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil {
return fmt.Errorf("error dereferencing announce from federator: %s", err)
}
if err := p.db.Put(incomingAnnounce); err != nil {
return fmt.Errorf("error adding dereferenced announce to the db: %s", err)
}
if err := p.notifyAnnounce(incomingAnnounce); err != nil {
return err
}
}
case gtsmodel.ActivityStreamsUpdate:
// UPDATE
@ -168,18 +185,14 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
// This function will deference all of the above, insert them in the database as necessary,
// and attach them to the status. The status itself will not be added to the database yet,
// that's up the caller to do.
func (p *processor) dereferenceStatusFields(status *gtsmodel.Status) error {
func (p *processor) dereferenceStatusFields(status *gtsmodel.Status, requestingUsername string) error {
l := p.log.WithFields(logrus.Fields{
"func": "dereferenceStatusFields",
"status": fmt.Sprintf("%+v", status),
})
l.Debug("entering function")
var t transport.Transport
var err error
var username string
// TODO: dereference with a user that's addressed by the status
t, err = p.federator.GetTransportForUser(username)
t, err := p.federator.GetTransportForUser(requestingUsername)
if err != nil {
return fmt.Errorf("error creating transport: %s", err)
}
@ -260,7 +273,7 @@ func (p *processor) dereferenceStatusFields(status *gtsmodel.Status) error {
}
// we just don't have it yet, so we should go get it....
accountable, err := p.federator.DereferenceRemoteAccount(username, uri)
accountable, err := p.federator.DereferenceRemoteAccount(requestingUsername, uri)
if err != nil {
// we can't dereference it so just skip it
l.Debugf("error dereferencing remote account with uri %s: %s", uri.String(), err)
@ -313,3 +326,106 @@ func (p *processor) dereferenceAccountFields(account *gtsmodel.Account, requesti
return nil
}
func (p *processor) dereferenceAnnounce(announce *gtsmodel.Status, requestingUsername string) error {
if announce.GTSBoostedStatus == nil || announce.GTSBoostedStatus.URI == "" {
// we can't do anything unfortunately
return errors.New("dereferenceAnnounce: no URI to dereference")
}
// check if we already have the boosted status in the database
boostedStatus := &gtsmodel.Status{}
err := p.db.GetWhere([]db.Where{{Key: "uri", Value: announce.GTSBoostedStatus.URI}}, boostedStatus)
if err == nil {
// nice, we already have it so we don't actually need to dereference it from remote
announce.Content = boostedStatus.Content
announce.ContentWarning = boostedStatus.ContentWarning
announce.ActivityStreamsType = boostedStatus.ActivityStreamsType
announce.Sensitive = boostedStatus.Sensitive
announce.Language = boostedStatus.Language
announce.Text = boostedStatus.Text
announce.BoostOfID = boostedStatus.ID
announce.Visibility = boostedStatus.Visibility
announce.VisibilityAdvanced = boostedStatus.VisibilityAdvanced
announce.GTSBoostedStatus = boostedStatus
return nil
}
// we don't have it so we need to dereference it
remoteStatusID, err := url.Parse(announce.GTSBoostedStatus.URI)
if err != nil {
return fmt.Errorf("dereferenceAnnounce: error parsing url %s: %s", announce.GTSBoostedStatus.URI, err)
}
statusable, err := p.federator.DereferenceRemoteStatus(requestingUsername, remoteStatusID)
if err != nil {
return fmt.Errorf("dereferenceAnnounce: error dereferencing remote status with id %s: %s", announce.GTSBoostedStatus.URI, err)
}
// make sure we have the author account in the db
attributedToProp := statusable.GetActivityStreamsAttributedTo()
for iter := attributedToProp.Begin(); iter != attributedToProp.End(); iter = iter.Next() {
accountURI := iter.GetIRI()
if accountURI == nil {
continue
}
if err := p.db.GetWhere([]db.Where{{Key: "uri", Value: accountURI.String()}}, &gtsmodel.Account{}); err == nil {
// we already have it, fine
continue
}
// we don't have the boosted status author account yet so dereference it
accountable, err := p.federator.DereferenceRemoteAccount(requestingUsername, accountURI)
if err != nil {
return fmt.Errorf("dereferenceAnnounce: error dereferencing remote account with id %s: %s", accountURI.String(), err)
}
account, err := p.tc.ASRepresentationToAccount(accountable, false)
if err != nil {
return fmt.Errorf("dereferenceAnnounce: error converting dereferenced account with id %s into account : %s", accountURI.String(), err)
}
// insert the dereferenced account so it gets an ID etc
if err := p.db.Put(account); err != nil {
return fmt.Errorf("dereferenceAnnounce: error putting dereferenced account with id %s into database : %s", accountURI.String(), err)
}
if err := p.dereferenceAccountFields(account, requestingUsername, false); err != nil {
return fmt.Errorf("dereferenceAnnounce: error dereferencing fields on account with id %s : %s", accountURI.String(), err)
}
}
// now convert the statusable into something we can understand
boostedStatus, err = p.tc.ASStatusToStatus(statusable)
if err != nil {
return fmt.Errorf("dereferenceAnnounce: error converting dereferenced statusable with id %s into status : %s", announce.GTSBoostedStatus.URI, err)
}
// put it in the db already so it gets an ID generated for it
if err := p.db.Put(boostedStatus); err != nil {
return fmt.Errorf("dereferenceAnnounce: error putting dereferenced status with id %s into the db: %s", announce.GTSBoostedStatus.URI, err)
}
// now dereference additional fields straight away (we're already async here so we have time)
if err := p.dereferenceStatusFields(boostedStatus, requestingUsername); err != nil {
return fmt.Errorf("dereferenceAnnounce: error dereferencing status fields for status with id %s: %s", announce.GTSBoostedStatus.URI, err)
}
// update with the newly dereferenced fields
if err := p.db.UpdateByID(boostedStatus.ID, boostedStatus); err != nil {
return fmt.Errorf("dereferenceAnnounce: error updating dereferenced status in the db: %s", err)
}
// we have everything we need!
announce.Content = boostedStatus.Content
announce.ContentWarning = boostedStatus.ContentWarning
announce.ActivityStreamsType = boostedStatus.ActivityStreamsType
announce.Sensitive = boostedStatus.Sensitive
announce.Language = boostedStatus.Language
announce.Text = boostedStatus.Text
announce.BoostOfID = boostedStatus.ID
announce.Visibility = boostedStatus.Visibility
announce.VisibilityAdvanced = boostedStatus.VisibilityAdvanced
announce.GTSBoostedStatus = boostedStatus
return nil
}

View File

@ -111,6 +111,18 @@ type Likeable interface {
withObject
}
// Announceable represents the minimum interface for an activitystreams 'announce' activity.
type Announceable interface {
withJSONLDId
withTypeName
withActor
withObject
withPublished
withTo
withCC
}
type withJSONLDId interface {
GetJSONLDId() vocab.JSONLDIdProperty
}

View File

@ -422,6 +422,99 @@ func (c *converter) ASLikeToFave(likeable Likeable) (*gtsmodel.StatusFave, error
}, nil
}
func (c *converter) ASAnnounceToStatus(announceable Announceable) (*gtsmodel.Status, bool, error) {
status := &gtsmodel.Status{}
isNew := true
// check if we already have the boost in the database
idProp := announceable.GetJSONLDId()
if idProp == nil || !idProp.IsIRI() {
return nil, isNew, errors.New("no id property set on announce, or was not an iri")
}
uri := idProp.GetIRI().String()
if err := c.db.GetWhere([]db.Where{{Key: "uri", Value: uri}}, status); err == nil {
// we already have it, great, just return it as-is :)
isNew = false
return status, isNew, nil
}
status.URI = uri
// get the URI of the announced/boosted status
boostedStatusURI, err := extractObject(announceable)
if err != nil {
return nil, isNew, fmt.Errorf("ASAnnounceToStatus: error getting object from announce: %s", err)
}
// set the URI on the new status for dereferencing later
status.GTSBoostedStatus = &gtsmodel.Status{
URI: boostedStatusURI.String(),
}
// get the published time for the announce
published, err := extractPublished(announceable)
if err != nil {
return nil, isNew, fmt.Errorf("ASAnnounceToStatus: error extracting published time: %s", err)
}
status.CreatedAt = published
status.UpdatedAt = published
// get the actor's IRI (ie., the person who boosted the status)
actor, err := extractActor(announceable)
if err != nil {
return nil, isNew, fmt.Errorf("ASAnnounceToStatus: error extracting actor: %s", err)
}
// get the boosting account based on the URI
// this should have been dereferenced already before we hit this point so we can confidently error out if we don't have it
boostingAccount := &gtsmodel.Account{}
if err := c.db.GetWhere([]db.Where{{Key: "uri", Value: actor.String()}}, boostingAccount); err != nil {
return nil, isNew, fmt.Errorf("ASAnnounceToStatus: error in db fetching account with uri %s: %s", actor.String(), err)
}
status.AccountID = boostingAccount.ID
// these will all be wrapped in the boosted status so set them empty here
status.Attachments = []string{}
status.Tags = []string{}
status.Mentions = []string{}
status.Emojis = []string{}
// parse the visibility from the To and CC entries
var visibility gtsmodel.Visibility
to, err := extractTos(announceable)
if err != nil {
return nil, isNew, fmt.Errorf("error extracting TO values: %s", err)
}
cc, err := extractCCs(announceable)
if err != nil {
return nil, isNew, fmt.Errorf("error extracting CC values: %s", err)
}
if len(to) == 0 && len(cc) == 0 {
return nil, isNew, errors.New("message wasn't TO or CC anyone")
}
// if it's CC'ed to public, it's public or unlocked
if isPublic(cc) {
visibility = gtsmodel.VisibilityUnlocked
}
if isPublic(to) {
visibility = gtsmodel.VisibilityPublic
}
// we should have a visibility by now
if visibility == "" {
return nil, isNew, errors.New("couldn't derive visibility")
}
status.Visibility = visibility
// the rest of the fields will be taken from the target status, but it's not our job to do the dereferencing here
return status, isNew, nil
}
func isPublic(tos []*url.URL) bool {
for _, entry := range tos {
if strings.EqualFold(entry.String(), "https://www.w3.org/ns/activitystreams#Public") {

View File

@ -100,6 +100,19 @@ type TypeConverter interface {
ASFollowToFollow(followable Followable) (*gtsmodel.Follow, error)
// ASLikeToFave converts a remote activitystreams 'like' representation into a gts model status fave.
ASLikeToFave(likeable Likeable) (*gtsmodel.StatusFave, error)
// ASAnnounceToStatus converts an activitystreams 'announce' into a status.
//
// The returned bool indicates whether this status is new (true) or not new (false).
//
// In other words, if the status is already in the database with the ID set on the announceable, then that will be returned,
// the returned bool will be false, and no further processing is necessary. If the returned bool is true, indicating
// that this is a new announce, then further processing will be necessary, because the returned status will be bareboned and
// require further dereferencing.
//
// This is useful when multiple users on an instance might receive the same boost, and we only want to process the boost once.
//
// NOTE -- this is different from one status being boosted multiple times! In this case, new boosts should indeed be created.
ASAnnounceToStatus(announceable Announceable) (status *gtsmodel.Status, new bool, err error)
/*
INTERNAL (gts) MODEL TO ACTIVITYSTREAMS MODEL