[feature] Push status edit messages into open streams (#2418)

* push status edit messages into open streams

* fix a few comments

* test++

* commented out code? moi?
This commit is contained in:
Sam Lade 2023-12-16 11:55:49 +00:00 committed by GitHub
parent fbe4e60232
commit 285d55dda8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 363 additions and 0 deletions

View file

@ -0,0 +1,38 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package stream
import (
"encoding/json"
"fmt"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/stream"
)
// StatusUpdate streams the given edited status to any open, appropriate
// streams belonging to the given account.
func (p *Processor) StatusUpdate(s *apimodel.Status, account *gtsmodel.Account, streamTypes []string) error {
bytes, err := json.Marshal(s)
if err != nil {
return fmt.Errorf("error marshalling status to json: %s", err)
}
return p.toAccount(string(bytes), stream.EventTypeStatusUpdate, streamTypes, account.ID)
}

View file

@ -0,0 +1,137 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package stream_test
import (
"bytes"
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/stream"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
)
type StatusUpdateTestSuite struct {
StreamTestSuite
}
func (suite *StatusUpdateTestSuite) TestStreamNotification() {
account := suite.testAccounts["local_account_1"]
openStream, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user")
suite.NoError(errWithCode)
editedStatus := suite.testStatuses["remote_account_1_status_1"]
apiStatus, err := typeutils.NewConverter(&suite.state).StatusToAPIStatus(context.Background(), editedStatus, account)
suite.NoError(err)
err = suite.streamProcessor.StatusUpdate(apiStatus, account, []string{stream.TimelineHome})
suite.NoError(err)
msg := <-openStream.Messages
dst := new(bytes.Buffer)
err = json.Indent(dst, []byte(msg.Payload), "", " ")
suite.NoError(err)
suite.Equal(`{
"id": "01FVW7JHQFSFK166WWKR8CBA6M",
"created_at": "2021-09-20T10:40:37.000Z",
"in_reply_to_id": null,
"in_reply_to_account_id": null,
"sensitive": false,
"spoiler_text": "",
"visibility": "unlisted",
"language": "en",
"uri": "http://fossbros-anonymous.io/users/foss_satan/statuses/01FVW7JHQFSFK166WWKR8CBA6M",
"url": "http://fossbros-anonymous.io/@foss_satan/statuses/01FVW7JHQFSFK166WWKR8CBA6M",
"replies_count": 0,
"reblogs_count": 0,
"favourites_count": 0,
"favourited": false,
"reblogged": false,
"muted": false,
"bookmarked": false,
"pinned": false,
"content": "dark souls status bot: \"thoughts of dog\"",
"reblog": null,
"account": {
"id": "01F8MH5ZK5VRH73AKHQM6Y9VNX",
"username": "foss_satan",
"acct": "foss_satan@fossbros-anonymous.io",
"display_name": "big gerald",
"locked": false,
"discoverable": true,
"bot": false,
"created_at": "2021-09-26T10:52:36.000Z",
"note": "i post about like, i dunno, stuff, or whatever!!!!",
"url": "http://fossbros-anonymous.io/@foss_satan",
"avatar": "",
"avatar_static": "",
"header": "http://localhost:8080/assets/default_header.png",
"header_static": "http://localhost:8080/assets/default_header.png",
"followers_count": 0,
"following_count": 0,
"statuses_count": 3,
"last_status_at": "2021-09-11T09:40:37.000Z",
"emojis": [],
"fields": []
},
"media_attachments": [
{
"id": "01FVW7RXPQ8YJHTEXYPE7Q8ZY0",
"type": "image",
"url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/original/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg",
"text_url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/original/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg",
"preview_url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/small/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg",
"remote_url": "http://fossbros-anonymous.io/attachments/original/13bbc3f8-2b5e-46ea-9531-40b4974d9912.jpg",
"preview_remote_url": "http://fossbros-anonymous.io/attachments/small/a499f55b-2d1e-4acd-98d2-1ac2ba6d79b9.jpg",
"meta": {
"original": {
"width": 472,
"height": 291,
"size": "472x291",
"aspect": 1.6219932
},
"small": {
"width": 472,
"height": 291,
"size": "472x291",
"aspect": 1.6219932
},
"focus": {
"x": 0,
"y": 0
}
},
"description": "tweet from thoughts of dog: i drank. all the water. in my bowl. earlier. but just now. i returned. to the same bowl. and it was. full again.. the bowl. is haunted",
"blurhash": "LARysgM_IU_3~pD%M_Rj_39FIAt6"
}
],
"mentions": [],
"tags": [],
"emojis": [],
"card": null,
"poll": null
}`, dst.String())
suite.Equal(msg.Event, "status.update")
}
func TestStatusUpdateTestSuite(t *testing.T) {
suite.Run(t, &StatusUpdateTestSuite{})
}

View file

@ -30,6 +30,7 @@
type StreamTestSuite struct {
suite.Suite
testAccounts map[string]*gtsmodel.Account
testStatuses map[string]*gtsmodel.Status
testTokens map[string]*gtsmodel.Token
db db.DB
oauthServer oauth.Server
@ -45,6 +46,7 @@ func (suite *StreamTestSuite) SetupTest() {
testrig.InitTestConfig()
suite.testAccounts = testrig.NewTestAccounts()
suite.testStatuses = testrig.NewTestStatuses()
suite.testTokens = testrig.NewTestTokens()
suite.db = testrig.NewTestDB(&suite.state)
suite.state.DB = suite.db

View file

@ -416,6 +416,11 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAP
}
}
// Push message that the status has been edited to streams.
if err := p.surface.timelineStatusUpdate(ctx, status); err != nil {
log.Errorf(ctx, "error streaming status edit: %v", err)
}
return nil
}

View file

@ -530,6 +530,11 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e
}
}
// Push message that the status has been edited to streams.
if err := p.surface.timelineStatusUpdate(ctx, status); err != nil {
log.Errorf(ctx, "error streaming status edit: %v", err)
}
return nil
}

View file

@ -390,3 +390,176 @@ func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID st
Errorf("error unpreparing status from list timelines: %v", err)
}
}
// timelineStatusUpdate looks up HOME and LIST timelines of accounts
// that follow the the status author and pushes edit messages into any
// active streams.
// Note that calling invalidateStatusFromTimelines takes care of the
// state in general, we just need to do this for any streams that are
// open right now.
func (s *surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Status) error {
// Ensure status fully populated; including account, mentions, etc.
if err := s.state.DB.PopulateStatus(ctx, status); err != nil {
return gtserror.Newf("error populating status with id %s: %w", status.ID, err)
}
// Get all local followers of the account that posted the status.
follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID)
if err != nil {
return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err)
}
// If the poster is also local, add a fake entry for them
// so they can see their own status in their timeline.
if status.Account.IsLocal() {
follows = append(follows, &gtsmodel.Follow{
AccountID: status.AccountID,
Account: status.Account,
Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself.
ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs.
})
}
// Push to streams for each local follower of this account.
if err := s.timelineStatusUpdateForFollowers(ctx, status, follows); err != nil {
return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err)
}
return nil
}
// timelineStatusUpdateForFollowers iterates through the given
// slice of followers of the account that posted the given status,
// pushing update messages into open list/home streams of each
// follower.
func (s *surface) timelineStatusUpdateForFollowers(
ctx context.Context,
status *gtsmodel.Status,
follows []*gtsmodel.Follow,
) error {
var (
errs gtserror.MultiError
)
for _, follow := range follows {
// Check to see if the status is timelineable for this follower,
// taking account of its visibility, who it replies to, and, if
// it's a reblog, whether follower account wants to see reblogs.
//
// If it's not timelineable, we can just stop early, since lists
// are prettymuch subsets of the home timeline, so if it shouldn't
// appear there, it shouldn't appear in lists either.
timelineable, err := s.filter.StatusHomeTimelineable(
ctx, follow.Account, status,
)
if err != nil {
errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err)
continue
}
if !timelineable {
// Nothing to do.
continue
}
// Add status to any relevant lists
// for this follow, if applicable.
s.listTimelineStatusUpdateForFollow(
ctx,
status,
follow,
&errs,
)
// Add status to home timeline for owner
// of this follow, if applicable.
err = s.timelineStreamStatusUpdate(
ctx,
follow.Account,
status,
stream.TimelineHome,
)
if err != nil {
errs.Appendf("error home timelining status: %w", err)
continue
}
}
return errs.Combine()
}
// listTimelineStatusUpdateForFollow pushes edits of the given status
// into any eligible lists streams opened by the given follower.
func (s *surface) listTimelineStatusUpdateForFollow(
ctx context.Context,
status *gtsmodel.Status,
follow *gtsmodel.Follow,
errs *gtserror.MultiError,
) {
// To put this status in appropriate list timelines,
// we need to get each listEntry that pertains to
// this follow. Then, we want to iterate through all
// those list entries, and add the status to the list
// that the entry belongs to if it meets criteria for
// inclusion in the list.
// Get every list entry that targets this follow's ID.
listEntries, err := s.state.DB.GetListEntriesForFollowID(
// We only need the list IDs.
gtscontext.SetBarebones(ctx),
follow.ID,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
errs.Appendf("error getting list entries: %w", err)
return
}
// Check eligibility for each list entry (if any).
for _, listEntry := range listEntries {
eligible, err := s.listEligible(ctx, listEntry, status)
if err != nil {
errs.Appendf("error checking list eligibility: %w", err)
continue
}
if !eligible {
// Don't add this.
continue
}
// At this point we are certain this status
// should be included in the timeline of the
// list that this list entry belongs to.
if err := s.timelineStreamStatusUpdate(
ctx,
follow.Account,
status,
stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list
); err != nil {
errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err)
// implicit continue
}
}
}
// timelineStatusUpdate streams the edited status to the user using the
// given streamType.
func (s *surface) timelineStreamStatusUpdate(
ctx context.Context,
account *gtsmodel.Account,
status *gtsmodel.Status,
streamType string,
) error {
apiStatus, err := s.converter.StatusToAPIStatus(ctx, status, account)
if err != nil {
err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err)
return err
}
if err := s.stream.StatusUpdate(apiStatus, account, []string{streamType}); err != nil {
err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err)
return err
}
return nil
}

View file

@ -26,6 +26,9 @@
EventTypeUpdate string = "update"
// EventTypeDelete -- something should be deleted from a user
EventTypeDelete string = "delete"
// EventTypeStatusUpdate -- something in the user's timeline has been edited
// (yes this is a confusing name, blame Mastodon)
EventTypeStatusUpdate string = "status.update"
)
const (