start messing about with streaming api

This commit is contained in:
tsmethurst 2021-06-17 19:20:08 +02:00 committed by tsmethurst
parent ad2e982edc
commit 8618c539f1
9 changed files with 241 additions and 2 deletions

1
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/golang/mock v1.5.0 // indirect github.com/golang/mock v1.5.0 // indirect
github.com/google/uuid v1.2.0 github.com/google/uuid v1.2.0
github.com/gorilla/sessions v1.2.1 // indirect github.com/gorilla/sessions v1.2.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/h2non/filetype v1.1.1 github.com/h2non/filetype v1.1.1
github.com/json-iterator/go v1.1.11 // indirect github.com/json-iterator/go v1.1.11 // indirect
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect

View File

@ -0,0 +1,46 @@
package streaming
import (
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
func (m *Module) StreamGETHandler(c *gin.Context) {
streamType := c.Query(StreamQueryKey)
if streamType == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("no stream type provided under query key %s", StreamQueryKey)})
return
}
accessToken := c.Query(AccessTokenQueryKey)
if accessToken == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": fmt.Sprintf("no access token provided under query key %s", AccessTokenQueryKey)})
return
}
account, err := m.processor.AuthorizeStreamingRequest(accessToken)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "could not authorize with given token"})
return
}
upgrader := websocket.Upgrader{
HandshakeTimeout: 5 * time.Second,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Subprotocols: []string{"wss"},
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
if errWithCode := m.processor.StreamForAccount(conn, account, streamType); errWithCode != nil {
c.JSON(errWithCode.Code(), errWithCode.Safe())
}
}

View File

@ -0,0 +1,62 @@
/*
GoToSocial
Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package streaming
import (
"net/http"
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/api"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/router"
)
const (
// BasePath is the path for the streaming api
BasePath = "/api/v1/streaming"
// StreamQueryKey is the query key for the type of stream being requested
StreamQueryKey = "stream"
// AccessTokenQueryKey
AccessTokenQueryKey = "access_token"
)
// Module implements the api.ClientModule interface for everything related to streaming
type Module struct {
config *config.Config
processor processing.Processor
log *logrus.Logger
}
// New returns a new streaming module
func New(config *config.Config, processor processing.Processor, log *logrus.Logger) api.ClientModule {
return &Module{
config: config,
processor: processor,
log: log,
}
}
// Route attaches all routes from this module to the given router
func (m *Module) Route(r router.Router) error {
r.AttachHandler(http.MethodGet, BasePath, m.StreamGETHandler)
return nil
}

View File

@ -56,6 +56,7 @@ type Server interface {
HandleAuthorizeRequest(w http.ResponseWriter, r *http.Request) error HandleAuthorizeRequest(w http.ResponseWriter, r *http.Request) error
ValidationBearerToken(r *http.Request) (oauth2.TokenInfo, error) ValidationBearerToken(r *http.Request) (oauth2.TokenInfo, error)
GenerateUserAccessToken(ti oauth2.TokenInfo, clientSecret string, userID string) (accessToken oauth2.TokenInfo, err error) GenerateUserAccessToken(ti oauth2.TokenInfo, clientSecret string, userID string) (accessToken oauth2.TokenInfo, err error)
LoadAccessToken(ctx context.Context, access string) (accessToken oauth2.TokenInfo, err error)
} }
// s fulfils the Server interface using the underlying oauth2 server // s fulfils the Server interface using the underlying oauth2 server
@ -171,3 +172,7 @@ func (s *s) GenerateUserAccessToken(ti oauth2.TokenInfo, clientSecret string, us
s.log.Tracef("obtained user-level access token: %+v", accessToken) s.log.Tracef("obtained user-level access token: %+v", accessToken)
return accessToken, nil return accessToken, nil
} }
func (s *s) LoadAccessToken(ctx context.Context, access string) (accessToken oauth2.TokenInfo, err error) {
return s.server.Manager.LoadAccessToken(ctx, access)
}

View File

@ -22,6 +22,7 @@ import (
"context" "context"
"net/http" "net/http"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/blob" "github.com/superseriousbusiness/gotosocial/internal/blob"
@ -33,6 +34,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/status" "github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/status"
"github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/streaming"
"github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/visibility" "github.com/superseriousbusiness/gotosocial/internal/visibility"
@ -132,6 +134,11 @@ type Processor interface {
// PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. // PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters.
PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, gtserror.WithCode) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, gtserror.WithCode)
// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API
AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error)
// StreamForAccount streams to websocket connection c for an account, with the given streamType.
StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode
/* /*
FEDERATION API-FACING PROCESSING FUNCTIONS FEDERATION API-FACING PROCESSING FUNCTIONS
These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply
@ -192,7 +199,8 @@ type processor struct {
SUB-PROCESSORS SUB-PROCESSORS
*/ */
statusProcessor status.Processor statusProcessor status.Processor
streamingProcessor streaming.Processor
} }
// NewProcessor returns a new Processor that uses the given federator and logger // NewProcessor returns a new Processor that uses the given federator and logger
@ -202,6 +210,7 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f
fromFederator := make(chan gtsmodel.FromFederator, 1000) fromFederator := make(chan gtsmodel.FromFederator, 1000)
statusProcessor := status.New(db, tc, config, fromClientAPI, log) statusProcessor := status.New(db, tc, config, fromClientAPI, log)
streamingProcessor := streaming.New(db, tc, oauthServer, config, log)
return &processor{ return &processor{
fromClientAPI: fromClientAPI, fromClientAPI: fromClientAPI,
@ -218,7 +227,8 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f
db: db, db: db,
filter: visibility.NewFilter(db, log), filter: visibility.NewFilter(db, log),
statusProcessor: statusProcessor, statusProcessor: statusProcessor,
streamingProcessor: streamingProcessor,
} }
} }

View File

@ -0,0 +1,15 @@
package processing
import (
"github.com/gorilla/websocket"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) {
return p.streamingProcessor.AuthorizeStreamingRequest(accessToken)
}
func (p *processor) StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode {
return p.streamingProcessor.StreamForAccount(c, account, streamType)
}

View File

@ -0,0 +1,33 @@
package streaming
import (
"context"
"fmt"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) {
ti, err := p.oauthServer.LoadAccessToken(context.Background(), accessToken)
if err != nil {
return nil, fmt.Errorf("AuthorizeStreamingRequest: error loading access token: %s", err)
}
uid := ti.GetUserID()
if uid == "" {
return nil, fmt.Errorf("AuthorizeStreamingRequest: no userid in token")
}
// fetch user's and account for this user id
user := &gtsmodel.User{}
if err := p.db.GetByID(uid, user); err != nil || user == nil {
return nil, fmt.Errorf("AuthorizeStreamingRequest: no user found for validated uid %s", uid)
}
acct := &gtsmodel.Account{}
if err := p.db.GetByID(user.AccountID, acct); err != nil || acct == nil {
return nil, fmt.Errorf("AuthorizeStreamingRequest: no account retrieved for user with id %s", uid)
}
return acct, nil
}

View File

@ -0,0 +1,22 @@
package streaming
import (
"github.com/gorilla/websocket"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
func (p *processor) StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode {
v, loaded := p.streamMap.LoadOrStore(account.ID, sync.Slice)
if loaded {
}
return nil
}
type streams struct {
accountID string
}

View File

@ -0,0 +1,45 @@
package streaming
import (
"sync"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/visibility"
)
// Processor wraps a bunch of functions for processing streaming.
type Processor interface {
// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API
AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error)
StreamForAccount(c *websocket.Conn, account *gtsmodel.Account, streamType string) gtserror.WithCode
}
type processor struct {
tc typeutils.TypeConverter
config *config.Config
db db.DB
filter visibility.Filter
log *logrus.Logger
oauthServer oauth.Server
streamMap *sync.Map
}
// New returns a new status processor.
func New(db db.DB, tc typeutils.TypeConverter, oauthServer oauth.Server, config *config.Config, log *logrus.Logger) Processor {
return &processor{
tc: tc,
config: config,
db: db,
filter: visibility.NewFilter(db, log),
log: log,
oauthServer: oauthServer,
streamMap: &sync.Map{},
}
}