tidy up some federation stuff

This commit is contained in:
tsmethurst 2021-04-28 21:46:13 +02:00
parent 621e59fd42
commit 211c43073f
11 changed files with 199 additions and 86 deletions

View File

@ -32,8 +32,13 @@ import (
type Distributor interface {
// FromClientAPI returns a channel for accepting messages that come from the gts client API.
FromClientAPI() chan FromClientAPI
// ClientAPIOut returns a channel for putting in messages that need to go to the gts client API.
// ToClientAPI returns a channel for putting in messages that need to go to the gts client API.
ToClientAPI() chan ToClientAPI
// FromFederator returns a channel for accepting messages that come from the federator (activitypub).
FromFederator() chan FromFederator
// ToFederator returns a channel for putting in messages that need to go to the federator (activitypub).
ToFederator() chan ToFederator
// Start starts the Distributor, reading from its channels and passing messages back and forth.
Start() error
// Stop stops the distributor cleanly, finishing handling any remaining messages before closing down.
@ -45,6 +50,8 @@ type distributor struct {
// federator pub.FederatingActor
fromClientAPI chan FromClientAPI
toClientAPI chan ToClientAPI
fromFederator chan FromFederator
toFederator chan ToFederator
stop chan interface{}
log *logrus.Logger
}
@ -55,21 +62,29 @@ func New(log *logrus.Logger) Distributor {
// federator: federator,
fromClientAPI: make(chan FromClientAPI, 100),
toClientAPI: make(chan ToClientAPI, 100),
fromFederator: make(chan FromFederator, 100),
toFederator: make(chan ToFederator, 100),
stop: make(chan interface{}),
log: log,
}
}
// ClientAPIIn returns a channel for accepting messages that come from the gts client API.
func (d *distributor) FromClientAPI() chan FromClientAPI {
return d.fromClientAPI
}
// ClientAPIOut returns a channel for putting in messages that need to go to the gts client API.
func (d *distributor) ToClientAPI() chan ToClientAPI {
return d.toClientAPI
}
func (d *distributor) FromFederator() chan FromFederator {
return d.fromFederator
}
func (d *distributor) ToFederator() chan ToFederator {
return d.toFederator
}
// Start starts the Distributor, reading from its channels and passing messages back and forth.
func (d *distributor) Start() error {
go func() {
@ -80,6 +95,10 @@ func (d *distributor) Start() error {
d.log.Infof("received message FROM client API: %+v", clientMsg)
case clientMsg := <-d.toClientAPI:
d.log.Infof("received message TO client API: %+v", clientMsg)
case federatorMsg := <-d.fromFederator:
d.log.Infof("received message FROM federator: %+v", federatorMsg)
case federatorMsg := <-d.toFederator:
d.log.Infof("received message TO federator: %+v", federatorMsg)
case <-d.stop:
break DistLoop
}
@ -108,3 +127,17 @@ type ToClientAPI struct {
APActivityType gtsmodel.ActivityStreamsActivity
Activity interface{}
}
// FromFederator wraps a message that travels from the federator into the distributor
type FromFederator struct {
APObjectType gtsmodel.ActivityStreamsObject
APActivityType gtsmodel.ActivityStreamsActivity
Activity interface{}
}
// ToFederator wraps a message that travels from the distributor into the federator
type ToFederator struct {
APObjectType gtsmodel.ActivityStreamsObject
APActivityType gtsmodel.ActivityStreamsActivity
Activity interface{}
}

View File

@ -24,21 +24,14 @@ import (
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/transport"
)
// New returns a go-fed compatible federating actor
func New(db db.DB, config *config.Config, log *logrus.Logger) pub.FederatingActor {
c := &Commoner{
db: db,
log: log,
config: config,
}
f := &Federator{
db: db,
log: log,
config: config,
}
return pub.NewFederatingActor(c, f, db.Federation(), &Clock{})
// NewFederatingActor returns a go-fed compatible federating actor
func NewFederatingActor(db db.DB, transportController transport.Controller, config *config.Config, log *logrus.Logger) pub.FederatingActor {
c := NewCommonBehavior(db, log, config, transportController)
f := NewFederatingProtocol(db, log, config, transportController)
pubDatabase := db.Federation()
clock := &Clock{}
return pub.NewFederatingActor(c, f, pubDatabase, clock)
}

View File

@ -20,6 +20,7 @@ package federation
import (
"context"
"fmt"
"net/http"
"net/url"
@ -28,13 +29,27 @@ import (
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/db/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/transport"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
// Commoner implements the go-fed common behavior interface
type Commoner struct {
db db.DB
log *logrus.Logger
config *config.Config
// commonBehavior implements the go-fed common behavior interface
type commonBehavior struct {
db db.DB
log *logrus.Logger
config *config.Config
transportController transport.Controller
}
// NewCommonBehavior returns an implementation of the pub.CommonBehavior interface that uses the given db, log, config, and transportController
func NewCommonBehavior(db db.DB, log *logrus.Logger, config *config.Config, transportController transport.Controller) pub.CommonBehavior {
return &commonBehavior{
db: db,
log: log,
config: config,
transportController: transportController,
}
}
/*
@ -63,7 +78,7 @@ type Commoner struct {
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
func (c *Commoner) AuthenticateGetInbox(ctx context.Context, w http.ResponseWriter, r *http.Request) (context.Context, bool, error) {
func (c *commonBehavior) AuthenticateGetInbox(ctx context.Context, w http.ResponseWriter, r *http.Request) (context.Context, bool, error) {
// TODO
// use context.WithValue() and context.Value() to set and get values through here
return nil, false, nil
@ -88,7 +103,7 @@ func (c *Commoner) AuthenticateGetInbox(ctx context.Context, w http.ResponseWrit
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
func (c *Commoner) AuthenticateGetOutbox(ctx context.Context, w http.ResponseWriter, r *http.Request) (context.Context, bool, error) {
func (c *commonBehavior) AuthenticateGetOutbox(ctx context.Context, w http.ResponseWriter, r *http.Request) (context.Context, bool, error) {
// TODO
return nil, false, nil
}
@ -101,7 +116,7 @@ func (c *Commoner) AuthenticateGetOutbox(ctx context.Context, w http.ResponseWri
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
func (c *Commoner) GetOutbox(ctx context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
func (c *commonBehavior) GetOutbox(ctx context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
// TODO
return nil, nil
}
@ -129,34 +144,29 @@ func (c *Commoner) GetOutbox(ctx context.Context, r *http.Request) (vocab.Activi
// Note that the library will not maintain a long-lived pointer to the
// returned Transport so that any private credentials are able to be
// garbage collected.
func (c *Commoner) NewTransport(ctx context.Context, actorBoxIRI *url.URL, gofedAgent string) (pub.Transport, error) {
// TODO
// prefs := []httpsig.Algorithm{httpsig.RSA_SHA256}
// digestPref := httpsig.DigestSha256
// getHeadersToSign := []string{httpsig.RequestTarget, "Date"}
// postHeadersToSign := []string{httpsig.RequestTarget, "Date", "Digest"}
// // Using github.com/go-fed/httpsig for HTTP Signatures:
// getSigner, _, err := httpsig.NewSigner(prefs, digestPref, getHeadersToSign, httpsig.Signature)
// if err != nil {
// return nil, err
// }
// postSigner, _, err := httpsig.NewSigner(prefs, digestPref, postHeadersToSign, httpsig.Signature)
// if err != nil {
// return nil, err
// }
// pubKeyId, privKey, err := s.getKeysForActorBoxIRI(actorBoxIRI)
// client := &http.Client{
// Timeout: time.Second * 30,
// }
// t := pub.NewHttpSigTransport(
// client,
// f.config.Host,
// &Clock{},
// getSigner,
// postSigner,
// pubKeyId,
// privKey)
func (c *commonBehavior) NewTransport(ctx context.Context, actorBoxIRI *url.URL, gofedAgent string) (pub.Transport, error) {
return nil, nil
var username string
var err error
if util.IsInboxPath(actorBoxIRI) {
username, err = util.ParseInboxPath(actorBoxIRI)
if err != nil {
return nil, fmt.Errorf("couldn't parse path %s as an inbox: %s", actorBoxIRI.String(), err)
}
} else if util.IsOutboxPath(actorBoxIRI) {
username, err = util.ParseOutboxPath(actorBoxIRI)
if err != nil {
return nil, fmt.Errorf("couldn't parse path %s as an outbox: %s", actorBoxIRI.String(), err)
}
} else {
return nil, fmt.Errorf("id %s was neither an inbox path nor an outbox path", actorBoxIRI.String())
}
account := &gtsmodel.Account{}
if err := c.db.GetWhere("username", username, account); err != nil {
return nil, err
}
return c.transportController.NewTransport(account.PublicKeyURI, account.PrivateKey)
}

View File

@ -0,0 +1,41 @@
/*
GoToSocial
Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package federation
import (
"github.com/go-fed/activity/pub"
"github.com/superseriousbusiness/gotosocial/internal/distributor"
)
// Federator wraps everything needed to manage activitypub federation from gotosocial
type Federator interface {
}
type federator struct {
actor pub.FederatingActor
distributor distributor.Distributor
}
// NewFederator returns a new federator that uses the given actor and distributor
func NewFederator(actor pub.FederatingActor, distributor distributor.Distributor) Federator {
return &federator{
actor: actor,
distributor: distributor,
}
}

View File

@ -34,17 +34,17 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/util"
)
// Federator implements the go-fed federating protocol interface
type Federator struct {
// FederatingProtocol implements the go-fed federating protocol interface
type FederatingProtocol struct {
db db.DB
log *logrus.Logger
config *config.Config
transportController transport.Controller
}
// NewFederator returns the gotosocial implementation of the go-fed FederatingProtocol interface
func NewFederator(db db.DB, log *logrus.Logger, config *config.Config, transportController transport.Controller) pub.FederatingProtocol {
return &Federator{
// NewFederatingProtocol returns the gotosocial implementation of the go-fed FederatingProtocol interface
func NewFederatingProtocol(db db.DB, log *logrus.Logger, config *config.Config, transportController transport.Controller) pub.FederatingProtocol {
return &FederatingProtocol{
db: db,
log: log,
config: config,
@ -78,7 +78,7 @@ func NewFederator(db db.DB, log *logrus.Logger, config *config.Config, transport
// PostInbox. In this case, the DelegateActor implementation must not
// write a response to the ResponseWriter as is expected that the caller
// to PostInbox will do so when handling the error.
func (f *Federator) PostInboxRequestBodyHook(ctx context.Context, r *http.Request, activity pub.Activity) (context.Context, error) {
func (f *FederatingProtocol) PostInboxRequestBodyHook(ctx context.Context, r *http.Request, activity pub.Activity) (context.Context, error) {
l := f.log.WithFields(logrus.Fields{
"func": "PostInboxRequestBodyHook",
"useragent": r.UserAgent(),
@ -128,7 +128,7 @@ func (f *Federator) PostInboxRequestBodyHook(ctx context.Context, r *http.Reques
// Finally, if the authentication and authorization succeeds, then
// authenticated must be true and error nil. The request will continue
// to be processed.
func (f *Federator) AuthenticatePostInbox(ctx context.Context, w http.ResponseWriter, r *http.Request) (context.Context, bool, error) {
func (f *FederatingProtocol) AuthenticatePostInbox(ctx context.Context, w http.ResponseWriter, r *http.Request) (context.Context, bool, error) {
l := f.log.WithFields(logrus.Fields{
"func": "AuthenticatePostInbox",
"useragent": r.UserAgent(),
@ -167,7 +167,7 @@ func (f *Federator) AuthenticatePostInbox(ctx context.Context, w http.ResponseWr
// Finally, if the authentication and authorization succeeds, then
// blocked must be false and error nil. The request will continue
// to be processed.
func (f *Federator) Blocked(ctx context.Context, actorIRIs []*url.URL) (bool, error) {
func (f *FederatingProtocol) Blocked(ctx context.Context, actorIRIs []*url.URL) (bool, error) {
// TODO
return false, nil
}
@ -191,7 +191,7 @@ func (f *Federator) Blocked(ctx context.Context, actorIRIs []*url.URL) (bool, er
//
// Applications are not expected to handle every single ActivityStreams
// type and extension. The unhandled ones are passed to DefaultCallback.
func (f *Federator) FederatingCallbacks(ctx context.Context) (pub.FederatingWrappedCallbacks, []interface{}, error) {
func (f *FederatingProtocol) FederatingCallbacks(ctx context.Context) (pub.FederatingWrappedCallbacks, []interface{}, error) {
// TODO
return pub.FederatingWrappedCallbacks{}, nil, nil
}
@ -203,7 +203,7 @@ func (f *Federator) FederatingCallbacks(ctx context.Context) (pub.FederatingWrap
// Applications are not expected to handle every single ActivityStreams
// type and extension, so the unhandled ones are passed to
// DefaultCallback.
func (f *Federator) DefaultCallback(ctx context.Context, activity pub.Activity) error {
func (f *FederatingProtocol) DefaultCallback(ctx context.Context, activity pub.Activity) error {
l := f.log.WithFields(logrus.Fields{
"func": "DefaultCallback",
"aptype": activity.GetTypeName(),
@ -216,7 +216,7 @@ func (f *Federator) DefaultCallback(ctx context.Context, activity pub.Activity)
// an activity to determine if inbox forwarding needs to occur.
//
// Zero or negative numbers indicate infinite recursion.
func (f *Federator) MaxInboxForwardingRecursionDepth(ctx context.Context) int {
func (f *FederatingProtocol) MaxInboxForwardingRecursionDepth(ctx context.Context) int {
// TODO
return 0
}
@ -226,7 +226,7 @@ func (f *Federator) MaxInboxForwardingRecursionDepth(ctx context.Context) int {
// delivery.
//
// Zero or negative numbers indicate infinite recursion.
func (f *Federator) MaxDeliveryRecursionDepth(ctx context.Context) int {
func (f *FederatingProtocol) MaxDeliveryRecursionDepth(ctx context.Context) int {
// TODO
return 0
}
@ -238,7 +238,7 @@ func (f *Federator) MaxDeliveryRecursionDepth(ctx context.Context) int {
//
// The activity is provided as a reference for more intelligent
// logic to be used, but the implementation must not modify it.
func (f *Federator) FilterForwarding(ctx context.Context, potentialRecipients []*url.URL, a pub.Activity) ([]*url.URL, error) {
func (f *FederatingProtocol) FilterForwarding(ctx context.Context, potentialRecipients []*url.URL, a pub.Activity) ([]*url.URL, error) {
// TODO
return nil, nil
}
@ -251,7 +251,7 @@ func (f *Federator) FilterForwarding(ctx context.Context, potentialRecipients []
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
func (f *Federator) GetInbox(ctx context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
func (f *FederatingProtocol) GetInbox(ctx context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
// TODO
return nil, nil
}

View File

@ -83,7 +83,7 @@ func (suite *ProtocolTestSuite) TestPostInboxRequestBodyHook() {
return nil, nil
}))
// setup module being tested
federator := federation.NewFederator(suite.db, suite.log, suite.config, tc).(*federation.Federator)
federator := federation.NewFederatingProtocol(suite.db, suite.log, suite.config, tc).(*federation.FederatingProtocol)
// setup request
ctx := context.Background()
@ -156,7 +156,7 @@ func (suite *ProtocolTestSuite) TestAuthenticatePostInbox() {
}))
// now setup module being tested, with the mock transport controller
federator := federation.NewFederator(suite.db, suite.log, suite.config, tc)
federator := federation.NewFederatingProtocol(suite.db, suite.log, suite.config, tc)
// setup request
ctx := context.Background()

View File

@ -21,6 +21,7 @@ package gotosocial
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
@ -45,6 +46,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/router"
"github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/internal/transport"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
)
@ -72,6 +74,9 @@ var Run action.GTSAction = func(ctx context.Context, c *config.Config, log *logr
if err := distributor.Start(); err != nil {
return fmt.Errorf("error starting distributor: %s", err)
}
transportController := transport.NewController(c, &federation.Clock{}, http.DefaultClient, log)
federatingActor := federation.NewFederatingActor(dbService, transportController, c, log)
federator := federation.NewFederator(federatingActor, distributor)
// build converters and util
ic := typeutils.NewConverter(c, dbService)
@ -113,7 +118,7 @@ var Run action.GTSAction = func(ctx context.Context, c *config.Config, log *logr
return fmt.Errorf("error creating instance account: %s", err)
}
gts, err := New(dbService, &cache.MockCache{}, router, federation.New(dbService, c, log), c)
gts, err := New(dbService, &cache.MockCache{}, router, federator, c)
if err != nil {
return fmt.Errorf("error creating gotosocial service: %s", err)
}

View File

@ -21,10 +21,10 @@ package gotosocial
import (
"context"
"github.com/go-fed/activity/pub"
"github.com/superseriousbusiness/gotosocial/internal/cache"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/router"
)
@ -38,23 +38,23 @@ type Gotosocial interface {
// New returns a new gotosocial server, initialized with the given configuration.
// An error will be returned the caller if something goes wrong during initialization
// eg., no db or storage connection, port for router already in use, etc.
func New(db db.DB, cache cache.Cache, apiRouter router.Router, federationAPI pub.FederatingActor, config *config.Config) (Gotosocial, error) {
func New(db db.DB, cache cache.Cache, apiRouter router.Router, federator federation.Federator, config *config.Config) (Gotosocial, error) {
return &gotosocial{
db: db,
cache: cache,
apiRouter: apiRouter,
federationAPI: federationAPI,
config: config,
db: db,
cache: cache,
apiRouter: apiRouter,
federator: federator,
config: config,
}, nil
}
// gotosocial fulfils the gotosocial interface.
type gotosocial struct {
db db.DB
cache cache.Cache
apiRouter router.Router
federationAPI pub.FederatingActor
config *config.Config
db db.DB
cache cache.Cache
apiRouter router.Router
federator federation.Federator
config *config.Config
}
// Start starts up the gotosocial server. If something goes wrong

View File

@ -64,6 +64,10 @@ var (
// inboxPathRegex parses a path that validates and captures the username part from eg /users/example_username/inbox
inboxPathRegex = regexp.MustCompile(inboxPathRegexString)
outboxPathRegexString = fmt.Sprintf(`^/?%s/(%s)/%s$`, UsersPath, usernameRegexString, OutboxPath)
// outboxPathRegex parses a path that validates and captures the username part from eg /users/example_username/outbox
outboxPathRegex = regexp.MustCompile(outboxPathRegexString)
actorPathRegexString = fmt.Sprintf(`^?/%s/(%s)$`, ActorsPath, usernameRegexString)
// actorPathRegex parses a path that validates and captures the username part from eg /actors/example_username
actorPathRegex = regexp.MustCompile(actorPathRegexString)

View File

@ -137,6 +137,11 @@ func IsInboxPath(id *url.URL) bool {
return inboxPathRegex.MatchString(strings.ToLower(id.Path))
}
// IsOutboxPath returns true if the given URL path corresponds to eg /users/example_username/outbox
func IsOutboxPath(id *url.URL) bool {
return outboxPathRegex.MatchString(strings.ToLower(id.Path))
}
// IsInstanceActorPath returns true if the given URL path corresponds to eg /actors/example_username
func IsInstanceActorPath(id *url.URL) bool {
return actorPathRegex.MatchString(strings.ToLower(id.Path))
@ -195,3 +200,14 @@ func ParseInboxPath(id *url.URL) (username string, err error) {
username = matches[1]
return
}
// ParseOutboxPath returns the username from a path such as /users/example_username/outbox
func ParseOutboxPath(id *url.URL) (username string, err error) {
matches := outboxPathRegex.FindStringSubmatch(id.Path)
if len(matches) != 2 {
err = fmt.Errorf("expected 2 matches but matches length was %d", len(matches))
return
}
username = matches[1]
return
}

View File

@ -19,8 +19,11 @@
package testrig
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"syscall"
@ -44,6 +47,7 @@ import (
// Run creates and starts a gotosocial testrig server
var Run action.GTSAction = func(ctx context.Context, _ *config.Config, log *logrus.Logger) error {
c := NewTestConfig()
dbService := NewTestDB()
router := NewTestRouter()
storageBackend := NewTestStorage()
@ -54,8 +58,15 @@ var Run action.GTSAction = func(ctx context.Context, _ *config.Config, log *logr
return fmt.Errorf("error starting distributor: %s", err)
}
mastoConverter := NewTestTypeConverter(dbService)
c := NewTestConfig()
transportController := NewTestTransportController(NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
r := ioutil.NopCloser(bytes.NewReader([]byte{}))
return &http.Response{
StatusCode: 200,
Body: r,
}, nil
}))
federatingActor := federation.NewFederatingActor(dbService, transportController, c, log)
federator := federation.NewFederator(federatingActor, distributor)
StandardDBSetup(dbService)
StandardStorageSetup(storageBackend, "./testrig/media")
@ -97,7 +108,7 @@ var Run action.GTSAction = func(ctx context.Context, _ *config.Config, log *logr
// return fmt.Errorf("error creating instance account: %s", err)
// }
gts, err := gotosocial.New(dbService, &cache.MockCache{}, router, federation.New(dbService, c, log), c)
gts, err := gotosocial.New(dbService, &cache.MockCache{}, router, federator, c)
if err != nil {
return fmt.Errorf("error creating gotosocial service: %s", err)
}