From 8618c539f12fbbb2f82211f63b3970fa7938d635 Mon Sep 17 00:00:00 2001 From: tsmethurst Date: Thu, 17 Jun 2021 19:20:08 +0200 Subject: [PATCH] start messing about with streaming api --- go.mod | 1 + internal/api/client/streaming/stream.go | 46 ++++++++++++++ internal/api/client/streaming/streaming.go | 62 +++++++++++++++++++ internal/oauth/server.go | 5 ++ internal/processing/processor.go | 14 ++++- internal/processing/streaming.go | 15 +++++ .../synchronous/streaming/authorize.go | 33 ++++++++++ .../synchronous/streaming/stream.go | 22 +++++++ .../synchronous/streaming/streaming.go | 45 ++++++++++++++ 9 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 internal/api/client/streaming/stream.go create mode 100644 internal/api/client/streaming/streaming.go create mode 100644 internal/processing/streaming.go create mode 100644 internal/processing/synchronous/streaming/authorize.go create mode 100644 internal/processing/synchronous/streaming/stream.go create mode 100644 internal/processing/synchronous/streaming/streaming.go diff --git a/go.mod b/go.mod index aec00d3..81e2c4d 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/golang/mock v1.5.0 // indirect github.com/google/uuid v1.2.0 github.com/gorilla/sessions v1.2.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/h2non/filetype v1.1.1 github.com/json-iterator/go v1.1.11 // indirect github.com/leodido/go-urn v1.2.1 // indirect diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go new file mode 100644 index 0000000..c0097d0 --- /dev/null +++ b/internal/api/client/streaming/stream.go @@ -0,0 +1,46 @@ +package streaming + +import ( + "fmt" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +func (m *Module) StreamGETHandler(c *gin.Context) { + streamType := c.Query(StreamQueryKey) + if streamType == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("no stream type provided under query key %s", StreamQueryKey)}) + return + } + + accessToken := c.Query(AccessTokenQueryKey) + if accessToken == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": fmt.Sprintf("no access token provided under query key %s", AccessTokenQueryKey)}) + return + } + + account, err := m.processor.AuthorizeStreamingRequest(accessToken) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "could not authorize with given token"}) + return + } + + upgrader := websocket.Upgrader{ + HandshakeTimeout: 5 * time.Second, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + Subprotocols: []string{"wss"}, + } + + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + return + } + + if errWithCode := m.processor.StreamForAccount(conn, account, streamType); errWithCode != nil { + c.JSON(errWithCode.Code(), errWithCode.Safe()) + } +} diff --git a/internal/api/client/streaming/streaming.go b/internal/api/client/streaming/streaming.go new file mode 100644 index 0000000..5a7cee9 --- /dev/null +++ b/internal/api/client/streaming/streaming.go @@ -0,0 +1,62 @@ +/* + GoToSocial + Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +package streaming + +import ( + "net/http" + + "github.com/sirupsen/logrus" + "github.com/superseriousbusiness/gotosocial/internal/api" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/processing" + "github.com/superseriousbusiness/gotosocial/internal/router" +) + +const ( + // BasePath is the path for the streaming api + BasePath = "/api/v1/streaming" + + // StreamQueryKey is the query key for the type of stream being requested + StreamQueryKey = "stream" + + // AccessTokenQueryKey + AccessTokenQueryKey = "access_token" +) + +// Module implements the api.ClientModule interface for everything related to streaming +type Module struct { + config *config.Config + processor processing.Processor + log *logrus.Logger +} + +// New returns a new streaming module +func New(config *config.Config, processor processing.Processor, log *logrus.Logger) api.ClientModule { + return &Module{ + config: config, + processor: processor, + log: log, + } +} + +// Route attaches all routes from this module to the given router +func (m *Module) Route(r router.Router) error { + r.AttachHandler(http.MethodGet, BasePath, m.StreamGETHandler) + return nil +} diff --git a/internal/oauth/server.go b/internal/oauth/server.go index fb84743..1289b18 100644 --- a/internal/oauth/server.go +++ b/internal/oauth/server.go @@ -56,6 +56,7 @@ type Server interface { HandleAuthorizeRequest(w http.ResponseWriter, r *http.Request) error ValidationBearerToken(r *http.Request) (oauth2.TokenInfo, error) GenerateUserAccessToken(ti oauth2.TokenInfo, clientSecret string, userID string) (accessToken oauth2.TokenInfo, err error) + LoadAccessToken(ctx context.Context, access string) (accessToken oauth2.TokenInfo, err error) } // s fulfils the Server interface using the underlying oauth2 server @@ -171,3 +172,7 @@ func (s *s) GenerateUserAccessToken(ti oauth2.TokenInfo, clientSecret string, us s.log.Tracef("obtained user-level access token: %+v", accessToken) return accessToken, nil } + +func (s *s) LoadAccessToken(ctx context.Context, access string) (accessToken oauth2.TokenInfo, err error) { + return s.server.Manager.LoadAccessToken(ctx, access) +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 301cb57..396f0a2 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -22,6 +22,7 @@ import ( "context" "net/http" + "github.com/gorilla/websocket" "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/blob" @@ -33,6 +34,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/status" + "github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/streaming" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" @@ -132,6 +134,11 @@ type Processor interface { // PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, gtserror.WithCode) + // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API + AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) + // StreamForAccount streams to websocket connection c for an account, with the given streamType. + StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode + /* FEDERATION API-FACING PROCESSING FUNCTIONS These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply @@ -192,7 +199,8 @@ type processor struct { SUB-PROCESSORS */ - statusProcessor status.Processor + statusProcessor status.Processor + streamingProcessor streaming.Processor } // NewProcessor returns a new Processor that uses the given federator and logger @@ -202,6 +210,7 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f fromFederator := make(chan gtsmodel.FromFederator, 1000) statusProcessor := status.New(db, tc, config, fromClientAPI, log) + streamingProcessor := streaming.New(db, tc, oauthServer, config, log) return &processor{ fromClientAPI: fromClientAPI, @@ -218,7 +227,8 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f db: db, filter: visibility.NewFilter(db, log), - statusProcessor: statusProcessor, + statusProcessor: statusProcessor, + streamingProcessor: streamingProcessor, } } diff --git a/internal/processing/streaming.go b/internal/processing/streaming.go new file mode 100644 index 0000000..80ca1bd --- /dev/null +++ b/internal/processing/streaming.go @@ -0,0 +1,15 @@ +package processing + +import ( + "github.com/gorilla/websocket" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) { + return p.streamingProcessor.AuthorizeStreamingRequest(accessToken) +} + +func (p *processor) StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode { + return p.streamingProcessor.StreamForAccount(c, account, streamType) +} diff --git a/internal/processing/synchronous/streaming/authorize.go b/internal/processing/synchronous/streaming/authorize.go new file mode 100644 index 0000000..8bbf185 --- /dev/null +++ b/internal/processing/synchronous/streaming/authorize.go @@ -0,0 +1,33 @@ +package streaming + +import ( + "context" + "fmt" + + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) { + ti, err := p.oauthServer.LoadAccessToken(context.Background(), accessToken) + if err != nil { + return nil, fmt.Errorf("AuthorizeStreamingRequest: error loading access token: %s", err) + } + + uid := ti.GetUserID() + if uid == "" { + return nil, fmt.Errorf("AuthorizeStreamingRequest: no userid in token") + } + + // fetch user's and account for this user id + user := >smodel.User{} + if err := p.db.GetByID(uid, user); err != nil || user == nil { + return nil, fmt.Errorf("AuthorizeStreamingRequest: no user found for validated uid %s", uid) + } + + acct := >smodel.Account{} + if err := p.db.GetByID(user.AccountID, acct); err != nil || acct == nil { + return nil, fmt.Errorf("AuthorizeStreamingRequest: no account retrieved for user with id %s", uid) + } + + return acct, nil +} diff --git a/internal/processing/synchronous/streaming/stream.go b/internal/processing/synchronous/streaming/stream.go new file mode 100644 index 0000000..e2bfaad --- /dev/null +++ b/internal/processing/synchronous/streaming/stream.go @@ -0,0 +1,22 @@ +package streaming + +import ( + "github.com/gorilla/websocket" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode { + + v, loaded := p.streamMap.LoadOrStore(account.ID, sync.Slice) + if loaded { + + } + + return nil +} + +type streams struct { + accountID string + +} diff --git a/internal/processing/synchronous/streaming/streaming.go b/internal/processing/synchronous/streaming/streaming.go new file mode 100644 index 0000000..207c1ec --- /dev/null +++ b/internal/processing/synchronous/streaming/streaming.go @@ -0,0 +1,45 @@ +package streaming + +import ( + "sync" + + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// Processor wraps a bunch of functions for processing streaming. +type Processor interface { + // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API + AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) + StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode +} + +type processor struct { + tc typeutils.TypeConverter + config *config.Config + db db.DB + filter visibility.Filter + log *logrus.Logger + oauthServer oauth.Server + streamMap *sync.Map +} + +// New returns a new status processor. +func New(db db.DB, tc typeutils.TypeConverter, oauthServer oauth.Server, config *config.Config, log *logrus.Logger) Processor { + return &processor{ + tc: tc, + config: config, + db: db, + filter: visibility.NewFilter(db, log), + log: log, + oauthServer: oauthServer, + streamMap: &sync.Map{}, + } +}