Follows and relationships (#27)
* Follows -- create and undo, both remote and local * Statuses -- federate new posts, including media, attachments, CWs and image descriptions.
This commit is contained in:
@ -20,6 +20,7 @@ package federation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@ -37,6 +38,12 @@ import (
|
||||
"github.com/superseriousbusiness/gotosocial/internal/util"
|
||||
)
|
||||
|
||||
type FederatingDB interface {
|
||||
pub.Database
|
||||
Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error
|
||||
Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error
|
||||
}
|
||||
|
||||
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.
|
||||
// It doesn't care what the underlying implementation of the DB interface is, as long as it works.
|
||||
type federatingDB struct {
|
||||
@ -47,8 +54,8 @@ type federatingDB struct {
|
||||
typeConverter typeutils.TypeConverter
|
||||
}
|
||||
|
||||
// NewFederatingDB returns a pub.Database interface using the given database, config, and logger.
|
||||
func NewFederatingDB(db db.DB, config *config.Config, log *logrus.Logger) pub.Database {
|
||||
// NewFederatingDB returns a FederatingDB interface using the given database, config, and logger.
|
||||
func NewFederatingDB(db db.DB, config *config.Config, log *logrus.Logger) FederatingDB {
|
||||
return &federatingDB{
|
||||
locks: new(sync.Map),
|
||||
db: db,
|
||||
@ -204,7 +211,7 @@ func (f *federatingDB) Owns(c context.Context, id *url.URL) (bool, error) {
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error parsing statuses path for url %s: %s", id.String(), err)
|
||||
}
|
||||
if err := f.db.GetWhere("uri", uid, >smodel.Status{}); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: uid}}, >smodel.Status{}); err != nil {
|
||||
if _, ok := err.(db.ErrNoEntries); ok {
|
||||
// there are no entries for this status
|
||||
return false, nil
|
||||
@ -253,7 +260,7 @@ func (f *federatingDB) ActorForOutbox(c context.Context, outboxIRI *url.URL) (ac
|
||||
return nil, fmt.Errorf("%s is not an outbox URI", outboxIRI.String())
|
||||
}
|
||||
acct := >smodel.Account{}
|
||||
if err := f.db.GetWhere("outbox_uri", outboxIRI.String(), acct); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "outbox_uri", Value: outboxIRI.String()}}, acct); err != nil {
|
||||
if _, ok := err.(db.ErrNoEntries); ok {
|
||||
return nil, fmt.Errorf("no actor found that corresponds to outbox %s", outboxIRI.String())
|
||||
}
|
||||
@ -278,7 +285,7 @@ func (f *federatingDB) ActorForInbox(c context.Context, inboxIRI *url.URL) (acto
|
||||
return nil, fmt.Errorf("%s is not an inbox URI", inboxIRI.String())
|
||||
}
|
||||
acct := >smodel.Account{}
|
||||
if err := f.db.GetWhere("inbox_uri", inboxIRI.String(), acct); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "inbox_uri", Value: inboxIRI.String()}}, acct); err != nil {
|
||||
if _, ok := err.(db.ErrNoEntries); ok {
|
||||
return nil, fmt.Errorf("no actor found that corresponds to inbox %s", inboxIRI.String())
|
||||
}
|
||||
@ -304,7 +311,7 @@ func (f *federatingDB) OutboxForInbox(c context.Context, inboxIRI *url.URL) (out
|
||||
return nil, fmt.Errorf("%s is not an inbox URI", inboxIRI.String())
|
||||
}
|
||||
acct := >smodel.Account{}
|
||||
if err := f.db.GetWhere("inbox_uri", inboxIRI.String(), acct); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "inbox_uri", Value: inboxIRI.String()}}, acct); err != nil {
|
||||
if _, ok := err.(db.ErrNoEntries); ok {
|
||||
return nil, fmt.Errorf("no actor found that corresponds to inbox %s", inboxIRI.String())
|
||||
}
|
||||
@ -343,9 +350,10 @@ func (f *federatingDB) Get(c context.Context, id *url.URL) (value vocab.Type, er
|
||||
|
||||
if util.IsUserPath(id) {
|
||||
acct := >smodel.Account{}
|
||||
if err := f.db.GetWhere("uri", id.String(), acct); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: id.String()}}, acct); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.Debug("is user path! returning account")
|
||||
return f.typeConverter.AccountToAS(acct)
|
||||
}
|
||||
|
||||
@ -371,27 +379,40 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
||||
"asType": asType.GetTypeName(),
|
||||
},
|
||||
)
|
||||
l.Debugf("received CREATE asType %+v", asType)
|
||||
m, err := streams.Serialize(asType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.Debugf("received CREATE asType %s", string(b))
|
||||
|
||||
targetAcctI := ctx.Value(util.APAccount)
|
||||
if targetAcctI == nil {
|
||||
l.Error("target account wasn't set on context")
|
||||
return nil
|
||||
}
|
||||
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
|
||||
if !ok {
|
||||
l.Error("target account was set on context but couldn't be parsed")
|
||||
return nil
|
||||
}
|
||||
|
||||
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
|
||||
if fromFederatorChanI == nil {
|
||||
l.Error("from federator channel wasn't set on context")
|
||||
return nil
|
||||
}
|
||||
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
|
||||
if !ok {
|
||||
l.Error("from federator channel was set on context but couldn't be parsed")
|
||||
return nil
|
||||
}
|
||||
|
||||
switch gtsmodel.ActivityStreamsActivity(asType.GetTypeName()) {
|
||||
switch asType.GetTypeName() {
|
||||
case gtsmodel.ActivityStreamsCreate:
|
||||
create, ok := asType.(vocab.ActivityStreamsCreate)
|
||||
if !ok {
|
||||
@ -399,7 +420,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
||||
}
|
||||
object := create.GetActivityStreamsObject()
|
||||
for objectIter := object.Begin(); objectIter != object.End(); objectIter = objectIter.Next() {
|
||||
switch gtsmodel.ActivityStreamsObject(objectIter.GetType().GetTypeName()) {
|
||||
switch objectIter.GetType().GetTypeName() {
|
||||
case gtsmodel.ActivityStreamsNote:
|
||||
note := objectIter.GetActivityStreamsNote()
|
||||
status, err := f.typeConverter.ASStatusToStatus(note)
|
||||
@ -407,13 +428,17 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
||||
return fmt.Errorf("error converting note to status: %s", err)
|
||||
}
|
||||
if err := f.db.Put(status); err != nil {
|
||||
if _, ok := err.(db.ErrAlreadyExists); ok {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("database error inserting status: %s", err)
|
||||
}
|
||||
|
||||
fromFederatorChan <- gtsmodel.FromFederator{
|
||||
APObjectType: gtsmodel.ActivityStreamsNote,
|
||||
APActivityType: gtsmodel.ActivityStreamsCreate,
|
||||
GTSModel: status,
|
||||
APObjectType: gtsmodel.ActivityStreamsNote,
|
||||
APActivityType: gtsmodel.ActivityStreamsCreate,
|
||||
GTSModel: status,
|
||||
ReceivingAccount: targetAcct,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -433,7 +458,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
||||
}
|
||||
|
||||
if !targetAcct.Locked {
|
||||
if err := f.db.AcceptFollowRequest(followRequest.AccountID, followRequest.TargetAccountID); err != nil {
|
||||
if _, err := f.db.AcceptFollowRequest(followRequest.AccountID, followRequest.TargetAccountID); err != nil {
|
||||
return fmt.Errorf("database error accepting follow request: %s", err)
|
||||
}
|
||||
}
|
||||
@ -450,14 +475,87 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
||||
// the entire value.
|
||||
//
|
||||
// The library makes this call only after acquiring a lock first.
|
||||
func (f *federatingDB) Update(c context.Context, asType vocab.Type) error {
|
||||
func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
|
||||
l := f.log.WithFields(
|
||||
logrus.Fields{
|
||||
"func": "Update",
|
||||
"asType": asType.GetTypeName(),
|
||||
},
|
||||
)
|
||||
l.Debugf("received UPDATE asType %+v", asType)
|
||||
m, err := streams.Serialize(asType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.Debugf("received UPDATE asType %s", string(b))
|
||||
|
||||
receivingAcctI := ctx.Value(util.APAccount)
|
||||
if receivingAcctI == nil {
|
||||
l.Error("receiving account wasn't set on context")
|
||||
}
|
||||
receivingAcct, ok := receivingAcctI.(*gtsmodel.Account)
|
||||
if !ok {
|
||||
l.Error("receiving account was set on context but couldn't be parsed")
|
||||
}
|
||||
|
||||
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
|
||||
if fromFederatorChanI == nil {
|
||||
l.Error("from federator channel wasn't set on context")
|
||||
}
|
||||
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
|
||||
if !ok {
|
||||
l.Error("from federator channel was set on context but couldn't be parsed")
|
||||
}
|
||||
|
||||
switch asType.GetTypeName() {
|
||||
case gtsmodel.ActivityStreamsUpdate:
|
||||
update, ok := asType.(vocab.ActivityStreamsCreate)
|
||||
if !ok {
|
||||
return errors.New("could not convert type to create")
|
||||
}
|
||||
object := update.GetActivityStreamsObject()
|
||||
for objectIter := object.Begin(); objectIter != object.End(); objectIter = objectIter.Next() {
|
||||
switch objectIter.GetType().GetTypeName() {
|
||||
case string(gtsmodel.ActivityStreamsPerson):
|
||||
person := objectIter.GetActivityStreamsPerson()
|
||||
updatedAcct, err := f.typeConverter.ASRepresentationToAccount(person)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error converting person to account: %s", err)
|
||||
}
|
||||
if err := f.db.Put(updatedAcct); err != nil {
|
||||
return fmt.Errorf("database error inserting updated account: %s", err)
|
||||
}
|
||||
|
||||
fromFederatorChan <- gtsmodel.FromFederator{
|
||||
APObjectType: gtsmodel.ActivityStreamsProfile,
|
||||
APActivityType: gtsmodel.ActivityStreamsUpdate,
|
||||
GTSModel: updatedAcct,
|
||||
ReceivingAccount: receivingAcct,
|
||||
}
|
||||
|
||||
case string(gtsmodel.ActivityStreamsApplication):
|
||||
application := objectIter.GetActivityStreamsApplication()
|
||||
updatedAcct, err := f.typeConverter.ASRepresentationToAccount(application)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error converting person to account: %s", err)
|
||||
}
|
||||
if err := f.db.Put(updatedAcct); err != nil {
|
||||
return fmt.Errorf("database error inserting updated account: %s", err)
|
||||
}
|
||||
|
||||
fromFederatorChan <- gtsmodel.FromFederator{
|
||||
APObjectType: gtsmodel.ActivityStreamsProfile,
|
||||
APActivityType: gtsmodel.ActivityStreamsUpdate,
|
||||
GTSModel: updatedAcct,
|
||||
ReceivingAccount: receivingAcct,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -490,7 +588,7 @@ func (f *federatingDB) GetOutbox(c context.Context, outboxIRI *url.URL) (inbox v
|
||||
)
|
||||
l.Debug("entering GETOUTBOX function")
|
||||
|
||||
return nil, nil
|
||||
return streams.NewActivityStreamsOrderedCollectionPage(), nil
|
||||
}
|
||||
|
||||
// SetOutbox saves the outbox value given from GetOutbox, with new items
|
||||
@ -522,9 +620,60 @@ func (f *federatingDB) NewID(c context.Context, t vocab.Type) (id *url.URL, err
|
||||
"asType": t.GetTypeName(),
|
||||
},
|
||||
)
|
||||
l.Debugf("received NEWID request for asType %+v", t)
|
||||
m, err := streams.Serialize(t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.Debugf("received NEWID request for asType %s", string(b))
|
||||
|
||||
return url.Parse(fmt.Sprintf("%s://%s/", f.config.Protocol, uuid.NewString()))
|
||||
switch t.GetTypeName() {
|
||||
case gtsmodel.ActivityStreamsFollow:
|
||||
// FOLLOW
|
||||
// ID might already be set on a follow we've created, so check it here and return it if it is
|
||||
follow, ok := t.(vocab.ActivityStreamsFollow)
|
||||
if !ok {
|
||||
return nil, errors.New("newid: follow couldn't be parsed into vocab.ActivityStreamsFollow")
|
||||
}
|
||||
idProp := follow.GetJSONLDId()
|
||||
if idProp != nil {
|
||||
if idProp.IsIRI() {
|
||||
return idProp.GetIRI(), nil
|
||||
}
|
||||
}
|
||||
// it's not set so create one based on the actor set on the follow (ie., the followER not the followEE)
|
||||
actorProp := follow.GetActivityStreamsActor()
|
||||
if actorProp != nil {
|
||||
for iter := actorProp.Begin(); iter != actorProp.End(); iter = iter.Next() {
|
||||
// take the IRI of the first actor we can find (there should only be one)
|
||||
if iter.IsIRI() {
|
||||
actorAccount := >smodel.Account{}
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: iter.GetIRI().String()}}, actorAccount); err == nil { // if there's an error here, just use the fallback behavior -- we don't need to return an error here
|
||||
return url.Parse(util.GenerateURIForFollow(actorAccount.Username, f.config.Protocol, f.config.Host))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case gtsmodel.ActivityStreamsNote:
|
||||
// NOTE aka STATUS
|
||||
// ID might already be set on a note we've created, so check it here and return it if it is
|
||||
note, ok := t.(vocab.ActivityStreamsNote)
|
||||
if !ok {
|
||||
return nil, errors.New("newid: follow couldn't be parsed into vocab.ActivityStreamsNote")
|
||||
}
|
||||
idProp := note.GetJSONLDId()
|
||||
if idProp != nil {
|
||||
if idProp.IsIRI() {
|
||||
return idProp.GetIRI(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fallback default behavior: just return a random UUID after our protocol and host
|
||||
return url.Parse(fmt.Sprintf("%s://%s/%s", f.config.Protocol, f.config.Host, uuid.NewString()))
|
||||
}
|
||||
|
||||
// Followers obtains the Followers Collection for an actor with the
|
||||
@ -543,7 +692,7 @@ func (f *federatingDB) Followers(c context.Context, actorIRI *url.URL) (follower
|
||||
l.Debugf("entering FOLLOWERS function with actorIRI %s", actorIRI.String())
|
||||
|
||||
acct := >smodel.Account{}
|
||||
if err := f.db.GetWhere("uri", actorIRI.String(), acct); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: actorIRI.String()}}, acct); err != nil {
|
||||
return nil, fmt.Errorf("db error getting account with uri %s: %s", actorIRI.String(), err)
|
||||
}
|
||||
|
||||
@ -585,7 +734,7 @@ func (f *federatingDB) Following(c context.Context, actorIRI *url.URL) (followin
|
||||
l.Debugf("entering FOLLOWING function with actorIRI %s", actorIRI.String())
|
||||
|
||||
acct := >smodel.Account{}
|
||||
if err := f.db.GetWhere("uri", actorIRI.String(), acct); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: actorIRI.String()}}, acct); err != nil {
|
||||
return nil, fmt.Errorf("db error getting account with uri %s: %s", actorIRI.String(), err)
|
||||
}
|
||||
|
||||
@ -627,3 +776,139 @@ func (f *federatingDB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.
|
||||
l.Debugf("entering LIKED function with actorIRI %s", actorIRI.String())
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
/*
|
||||
CUSTOM FUNCTIONALITY FOR GTS
|
||||
*/
|
||||
|
||||
func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error {
|
||||
l := f.log.WithFields(
|
||||
logrus.Fields{
|
||||
"func": "Undo",
|
||||
"asType": undo.GetTypeName(),
|
||||
},
|
||||
)
|
||||
m, err := streams.Serialize(undo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.Debugf("received UNDO asType %s", string(b))
|
||||
|
||||
targetAcctI := ctx.Value(util.APAccount)
|
||||
if targetAcctI == nil {
|
||||
l.Error("UNDO: target account wasn't set on context")
|
||||
return nil
|
||||
}
|
||||
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
|
||||
if !ok {
|
||||
l.Error("UNDO: target account was set on context but couldn't be parsed")
|
||||
return nil
|
||||
}
|
||||
|
||||
undoObject := undo.GetActivityStreamsObject()
|
||||
if undoObject == nil {
|
||||
return errors.New("UNDO: no object set on vocab.ActivityStreamsUndo")
|
||||
}
|
||||
|
||||
for iter := undoObject.Begin(); iter != undoObject.End(); iter = iter.Next() {
|
||||
switch iter.GetType().GetTypeName() {
|
||||
case string(gtsmodel.ActivityStreamsFollow):
|
||||
// UNDO FOLLOW
|
||||
ASFollow, ok := iter.GetType().(vocab.ActivityStreamsFollow)
|
||||
if !ok {
|
||||
return errors.New("UNDO: couldn't parse follow into vocab.ActivityStreamsFollow")
|
||||
}
|
||||
// make sure the actor owns the follow
|
||||
if !sameActor(undo.GetActivityStreamsActor(), ASFollow.GetActivityStreamsActor()) {
|
||||
return errors.New("UNDO: follow actor and activity actor not the same")
|
||||
}
|
||||
// convert the follow to something we can understand
|
||||
gtsFollow, err := f.typeConverter.ASFollowToFollow(ASFollow)
|
||||
if err != nil {
|
||||
return fmt.Errorf("UNDO: error converting asfollow to gtsfollow: %s", err)
|
||||
}
|
||||
// make sure the addressee of the original follow is the same as whatever inbox this landed in
|
||||
if gtsFollow.TargetAccountID != targetAcct.ID {
|
||||
return errors.New("UNDO: follow object account and inbox account were not the same")
|
||||
}
|
||||
// delete any existing FOLLOW
|
||||
if err := f.db.DeleteWhere([]db.Where{{Key: "uri", Value: gtsFollow.URI}}, >smodel.Follow{}); err != nil {
|
||||
return fmt.Errorf("UNDO: db error removing follow: %s", err)
|
||||
}
|
||||
// delete any existing FOLLOW REQUEST
|
||||
if err := f.db.DeleteWhere([]db.Where{{Key: "uri", Value: gtsFollow.URI}}, >smodel.FollowRequest{}); err != nil {
|
||||
return fmt.Errorf("UNDO: db error removing follow request: %s", err)
|
||||
}
|
||||
l.Debug("follow undone")
|
||||
return nil
|
||||
case string(gtsmodel.ActivityStreamsLike):
|
||||
// UNDO LIKE
|
||||
case string(gtsmodel.ActivityStreamsAnnounce):
|
||||
// UNDO BOOST/REBLOG/ANNOUNCE
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
|
||||
l := f.log.WithFields(
|
||||
logrus.Fields{
|
||||
"func": "Accept",
|
||||
"asType": accept.GetTypeName(),
|
||||
},
|
||||
)
|
||||
m, err := streams.Serialize(accept)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.Debugf("received ACCEPT asType %s", string(b))
|
||||
|
||||
inboxAcctI := ctx.Value(util.APAccount)
|
||||
if inboxAcctI == nil {
|
||||
l.Error("ACCEPT: inbox account wasn't set on context")
|
||||
return nil
|
||||
}
|
||||
inboxAcct, ok := inboxAcctI.(*gtsmodel.Account)
|
||||
if !ok {
|
||||
l.Error("ACCEPT: inbox account was set on context but couldn't be parsed")
|
||||
return nil
|
||||
}
|
||||
|
||||
acceptObject := accept.GetActivityStreamsObject()
|
||||
if acceptObject == nil {
|
||||
return errors.New("ACCEPT: no object set on vocab.ActivityStreamsUndo")
|
||||
}
|
||||
|
||||
for iter := acceptObject.Begin(); iter != acceptObject.End(); iter = iter.Next() {
|
||||
switch iter.GetType().GetTypeName() {
|
||||
case string(gtsmodel.ActivityStreamsFollow):
|
||||
// ACCEPT FOLLOW
|
||||
asFollow, ok := iter.GetType().(vocab.ActivityStreamsFollow)
|
||||
if !ok {
|
||||
return errors.New("ACCEPT: couldn't parse follow into vocab.ActivityStreamsFollow")
|
||||
}
|
||||
// convert the follow to something we can understand
|
||||
gtsFollow, err := f.typeConverter.ASFollowToFollow(asFollow)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ACCEPT: error converting asfollow to gtsfollow: %s", err)
|
||||
}
|
||||
// make sure the addressee of the original follow is the same as whatever inbox this landed in
|
||||
if gtsFollow.AccountID != inboxAcct.ID {
|
||||
return errors.New("ACCEPT: follow object account and inbox account were not the same")
|
||||
}
|
||||
_, err = f.db.AcceptFollowRequest(gtsFollow.AccountID, gtsFollow.TargetAccountID)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -124,7 +124,7 @@ func (f *federator) AuthenticatePostInbox(ctx context.Context, w http.ResponseWr
|
||||
}
|
||||
|
||||
requestingAccount := >smodel.Account{}
|
||||
if err := f.db.GetWhere("uri", publicKeyOwnerURI.String(), requestingAccount); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: publicKeyOwnerURI.String()}}, requestingAccount); err != nil {
|
||||
// there's been a proper error so return it
|
||||
if _, ok := err.(db.ErrNoEntries); !ok {
|
||||
return ctx, false, fmt.Errorf("error getting requesting account with public key id %s: %s", publicKeyOwnerURI.String(), err)
|
||||
@ -146,6 +146,22 @@ func (f *federator) AuthenticatePostInbox(ctx context.Context, w http.ResponseWr
|
||||
}
|
||||
|
||||
requestingAccount = a
|
||||
|
||||
// send the newly dereferenced account into the processor channel for further async processing
|
||||
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
|
||||
if fromFederatorChanI == nil {
|
||||
l.Error("from federator channel wasn't set on context")
|
||||
}
|
||||
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
|
||||
if !ok {
|
||||
l.Error("from federator channel was set on context but couldn't be parsed")
|
||||
}
|
||||
|
||||
fromFederatorChan <- gtsmodel.FromFederator{
|
||||
APObjectType: gtsmodel.ActivityStreamsProfile,
|
||||
APActivityType: gtsmodel.ActivityStreamsCreate,
|
||||
GTSModel: requestingAccount,
|
||||
}
|
||||
}
|
||||
|
||||
withRequester := context.WithValue(ctx, util.APRequestingAccount, requestingAccount)
|
||||
@ -184,7 +200,7 @@ func (f *federator) Blocked(ctx context.Context, actorIRIs []*url.URL) (bool, er
|
||||
|
||||
for _, uri := range actorIRIs {
|
||||
a := >smodel.Account{}
|
||||
if err := f.db.GetWhere("uri", uri.String(), a); err != nil {
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: uri.String()}}, a); err != nil {
|
||||
_, ok := err.(db.ErrNoEntries)
|
||||
if ok {
|
||||
// we don't have an entry for this account so it's not blocked
|
||||
@ -228,17 +244,19 @@ func (f *federator) FederatingCallbacks(ctx context.Context) (wrapped pub.Federa
|
||||
"func": "FederatingCallbacks",
|
||||
})
|
||||
|
||||
targetAcctI := ctx.Value(util.APAccount)
|
||||
if targetAcctI == nil {
|
||||
l.Error("target account wasn't set on context")
|
||||
receivingAcctI := ctx.Value(util.APAccount)
|
||||
if receivingAcctI == nil {
|
||||
l.Error("receiving account wasn't set on context")
|
||||
return
|
||||
}
|
||||
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
|
||||
receivingAcct, ok := receivingAcctI.(*gtsmodel.Account)
|
||||
if !ok {
|
||||
l.Error("target account was set on context but couldn't be parsed")
|
||||
l.Error("receiving account was set on context but couldn't be parsed")
|
||||
return
|
||||
}
|
||||
|
||||
var onFollow pub.OnFollowBehavior = pub.OnFollowAutomaticallyAccept
|
||||
if targetAcct.Locked {
|
||||
if receivingAcct.Locked {
|
||||
onFollow = pub.OnFollowDoNothing
|
||||
}
|
||||
|
||||
@ -248,6 +266,17 @@ func (f *federator) FederatingCallbacks(ctx context.Context) (wrapped pub.Federa
|
||||
OnFollow: onFollow,
|
||||
}
|
||||
|
||||
other = []interface{}{
|
||||
// override default undo behavior
|
||||
func(ctx context.Context, undo vocab.ActivityStreamsUndo) error {
|
||||
return f.FederatingDB().Undo(ctx, undo)
|
||||
},
|
||||
// override default accept behavior
|
||||
func(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
|
||||
return f.FederatingDB().Accept(ctx, accept)
|
||||
},
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,8 @@ import (
|
||||
type Federator interface {
|
||||
// FederatingActor returns the underlying pub.FederatingActor, which can be used to send activities, and serve actors at inboxes/outboxes.
|
||||
FederatingActor() pub.FederatingActor
|
||||
// FederatingDB returns the underlying FederatingDB interface.
|
||||
FederatingDB() FederatingDB
|
||||
// AuthenticateFederatedRequest can be used to check the authenticity of incoming http-signed requests for federating resources.
|
||||
// The given username will be used to create a transport for making outgoing requests. See the implementation for more detailed comments.
|
||||
AuthenticateFederatedRequest(username string, r *http.Request) (*url.URL, error)
|
||||
@ -52,6 +54,7 @@ type Federator interface {
|
||||
type federator struct {
|
||||
config *config.Config
|
||||
db db.DB
|
||||
federatingDB FederatingDB
|
||||
clock pub.Clock
|
||||
typeConverter typeutils.TypeConverter
|
||||
transportController transport.Controller
|
||||
@ -60,18 +63,19 @@ type federator struct {
|
||||
}
|
||||
|
||||
// NewFederator returns a new federator
|
||||
func NewFederator(db db.DB, transportController transport.Controller, config *config.Config, log *logrus.Logger, typeConverter typeutils.TypeConverter) Federator {
|
||||
func NewFederator(db db.DB, federatingDB FederatingDB, transportController transport.Controller, config *config.Config, log *logrus.Logger, typeConverter typeutils.TypeConverter) Federator {
|
||||
|
||||
clock := &Clock{}
|
||||
f := &federator{
|
||||
config: config,
|
||||
db: db,
|
||||
federatingDB: federatingDB,
|
||||
clock: &Clock{},
|
||||
typeConverter: typeConverter,
|
||||
transportController: transportController,
|
||||
log: log,
|
||||
}
|
||||
actor := newFederatingActor(f, f, db.Federation(), clock)
|
||||
actor := newFederatingActor(f, f, federatingDB, clock)
|
||||
f.actor = actor
|
||||
return f
|
||||
}
|
||||
@ -79,3 +83,7 @@ func NewFederator(db db.DB, transportController transport.Controller, config *co
|
||||
func (f *federator) FederatingActor() pub.FederatingActor {
|
||||
return f.actor
|
||||
}
|
||||
|
||||
func (f *federator) FederatingDB() FederatingDB {
|
||||
return f.federatingDB
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ func (suite *ProtocolTestSuite) TestPostInboxRequestBodyHook() {
|
||||
return nil, nil
|
||||
}))
|
||||
// setup module being tested
|
||||
federator := federation.NewFederator(suite.db, tc, suite.config, suite.log, suite.typeConverter)
|
||||
federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.config, suite.log, suite.typeConverter)
|
||||
|
||||
// setup request
|
||||
ctx := context.Background()
|
||||
@ -155,7 +155,7 @@ func (suite *ProtocolTestSuite) TestAuthenticatePostInbox() {
|
||||
}))
|
||||
|
||||
// now setup module being tested, with the mock transport controller
|
||||
federator := federation.NewFederator(suite.db, tc, suite.config, suite.log, suite.typeConverter)
|
||||
federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.config, suite.log, suite.typeConverter)
|
||||
|
||||
// setup request
|
||||
ctx := context.Background()
|
||||
|
@ -27,11 +27,13 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/go-fed/activity/pub"
|
||||
"github.com/go-fed/activity/streams"
|
||||
"github.com/go-fed/activity/streams/vocab"
|
||||
"github.com/go-fed/httpsig"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/db"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/transport"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
|
||||
@ -128,57 +130,73 @@ func (f *federator) AuthenticateFederatedRequest(username string, r *http.Reques
|
||||
return nil, fmt.Errorf("could not parse key id into a url: %s", err)
|
||||
}
|
||||
|
||||
transport, err := f.GetTransportForUser(username)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("transport err: %s", err)
|
||||
}
|
||||
var publicKey interface{}
|
||||
var pkOwnerURI *url.URL
|
||||
if strings.EqualFold(requestingPublicKeyID.Host, f.config.Host) {
|
||||
// the request is coming from INSIDE THE HOUSE so skip the remote dereferencing
|
||||
requestingLocalAccount := >smodel.Account{}
|
||||
if err := f.db.GetWhere([]db.Where{{Key: "public_key_uri", Value: requestingPublicKeyID.String()}}, requestingLocalAccount); err != nil {
|
||||
return nil, fmt.Errorf("couldn't get local account with public key uri %s from the database: %s", requestingPublicKeyID.String(), err)
|
||||
}
|
||||
publicKey = requestingLocalAccount.PublicKey
|
||||
pkOwnerURI, err = url.Parse(requestingLocalAccount.URI)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing url %s: %s", requestingLocalAccount.URI, err)
|
||||
}
|
||||
} else {
|
||||
// the request is remote, so we need to authenticate the request properly by dereferencing the remote key
|
||||
transport, err := f.GetTransportForUser(username)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("transport err: %s", err)
|
||||
}
|
||||
|
||||
// The actual http call to the remote server is made right here in the Dereference function.
|
||||
b, err := transport.Dereference(context.Background(), requestingPublicKeyID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error deferencing key %s: %s", requestingPublicKeyID.String(), err)
|
||||
}
|
||||
// The actual http call to the remote server is made right here in the Dereference function.
|
||||
b, err := transport.Dereference(context.Background(), requestingPublicKeyID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error deferencing key %s: %s", requestingPublicKeyID.String(), err)
|
||||
}
|
||||
|
||||
// if the key isn't in the response, we can't authenticate the request
|
||||
requestingPublicKey, err := getPublicKeyFromResponse(context.Background(), b, requestingPublicKeyID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting key %s from response %s: %s", requestingPublicKeyID.String(), string(b), err)
|
||||
}
|
||||
// if the key isn't in the response, we can't authenticate the request
|
||||
requestingPublicKey, err := getPublicKeyFromResponse(context.Background(), b, requestingPublicKeyID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting key %s from response %s: %s", requestingPublicKeyID.String(), string(b), err)
|
||||
}
|
||||
|
||||
// we should be able to get the actual key embedded in the vocab.W3IDSecurityV1PublicKey
|
||||
pkPemProp := requestingPublicKey.GetW3IDSecurityV1PublicKeyPem()
|
||||
if pkPemProp == nil || !pkPemProp.IsXMLSchemaString() {
|
||||
return nil, errors.New("publicKeyPem property is not provided or it is not embedded as a value")
|
||||
}
|
||||
// we should be able to get the actual key embedded in the vocab.W3IDSecurityV1PublicKey
|
||||
pkPemProp := requestingPublicKey.GetW3IDSecurityV1PublicKeyPem()
|
||||
if pkPemProp == nil || !pkPemProp.IsXMLSchemaString() {
|
||||
return nil, errors.New("publicKeyPem property is not provided or it is not embedded as a value")
|
||||
}
|
||||
|
||||
// and decode the PEM so that we can parse it as a golang public key
|
||||
pubKeyPem := pkPemProp.Get()
|
||||
block, _ := pem.Decode([]byte(pubKeyPem))
|
||||
if block == nil || block.Type != "PUBLIC KEY" {
|
||||
return nil, errors.New("could not decode publicKeyPem to PUBLIC KEY pem block type")
|
||||
}
|
||||
// and decode the PEM so that we can parse it as a golang public key
|
||||
pubKeyPem := pkPemProp.Get()
|
||||
block, _ := pem.Decode([]byte(pubKeyPem))
|
||||
if block == nil || block.Type != "PUBLIC KEY" {
|
||||
return nil, errors.New("could not decode publicKeyPem to PUBLIC KEY pem block type")
|
||||
}
|
||||
|
||||
p, err := x509.ParsePKIXPublicKey(block.Bytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse public key from block bytes: %s", err)
|
||||
publicKey, err = x509.ParsePKIXPublicKey(block.Bytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse public key from block bytes: %s", err)
|
||||
}
|
||||
|
||||
// all good! we just need the URI of the key owner to return
|
||||
pkOwnerProp := requestingPublicKey.GetW3IDSecurityV1Owner()
|
||||
if pkOwnerProp == nil || !pkOwnerProp.IsIRI() {
|
||||
return nil, errors.New("publicKeyOwner property is not provided or it is not embedded as a value")
|
||||
}
|
||||
pkOwnerURI = pkOwnerProp.GetIRI()
|
||||
}
|
||||
if p == nil {
|
||||
if publicKey == nil {
|
||||
return nil, errors.New("returned public key was empty")
|
||||
}
|
||||
|
||||
// do the actual authentication here!
|
||||
algo := httpsig.RSA_SHA256 // TODO: make this more robust
|
||||
if err := verifier.Verify(p, algo); err != nil {
|
||||
if err := verifier.Verify(publicKey, algo); err != nil {
|
||||
return nil, fmt.Errorf("error verifying key %s: %s", requestingPublicKeyID.String(), err)
|
||||
}
|
||||
|
||||
// all good! we just need the URI of the key owner to return
|
||||
pkOwnerProp := requestingPublicKey.GetW3IDSecurityV1Owner()
|
||||
if pkOwnerProp == nil || !pkOwnerProp.IsIRI() {
|
||||
return nil, errors.New("publicKeyOwner property is not provided or it is not embedded as a value")
|
||||
}
|
||||
pkOwnerURI := pkOwnerProp.GetIRI()
|
||||
|
||||
return pkOwnerURI, nil
|
||||
}
|
||||
|
||||
@ -217,6 +235,12 @@ func (f *federator) DereferenceRemoteAccount(username string, remoteAccountID *u
|
||||
return nil, errors.New("error resolving type as activitystreams application")
|
||||
}
|
||||
return p, nil
|
||||
case string(gtsmodel.ActivityStreamsService):
|
||||
p, ok := t.(vocab.ActivityStreamsService)
|
||||
if !ok {
|
||||
return nil, errors.New("error resolving type as activitystreams service")
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("type name %s not supported", t.GetTypeName())
|
||||
@ -243,3 +267,23 @@ func (f *federator) GetTransportForUser(username string) (transport.Transport, e
|
||||
}
|
||||
return transport, nil
|
||||
}
|
||||
|
||||
func sameActor(activityActor vocab.ActivityStreamsActorProperty, followActor vocab.ActivityStreamsActorProperty) bool {
|
||||
if activityActor == nil || followActor == nil {
|
||||
return false
|
||||
}
|
||||
for aIter := activityActor.Begin(); aIter != activityActor.End(); aIter = aIter.Next() {
|
||||
for fIter := followActor.Begin(); fIter != followActor.End(); fIter = fIter.Next() {
|
||||
if aIter.GetIRI() == nil {
|
||||
return false
|
||||
}
|
||||
if fIter.GetIRI() == nil {
|
||||
return false
|
||||
}
|
||||
if aIter.GetIRI().String() == fIter.GetIRI().String() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
Reference in New Issue
Block a user