mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-01-22 08:36:24 +01:00
8fdd358f4b
* Implement conversations API * Sort and page conversations by last status ID * Appease linter * Fix deleting conversations and statuses * Refactor to make migrations automatic * Lint * Update tests post-merge * Fixes from live-fire testing * Linter caught a format problem * Refactor tests, fix cache * Negative test for non-DMs * Run conversations advanced migration on testrig startup as well as regular server startup * Document (lack of) side effects of API method for deleting a conversation * Make not-found check less nested for readability * Rename PutConversation to UpsertConversation * Use util.Ptr instead of IIFE * Reduce cache used by conversations * Remove unnecessary TableExpr/ColumnExpr * Use struct tags for both unique constraints on Conversation * Make it clear how paging with GetDirectStatusIDsBatch should be used * Let conversation paging skip conversations it can't render * Use Bun NewDropTable * Convert delete raw query to Bun * Convert update raw query to Bun * Convert latestConversationStatusesTempTable raw query partially to Bun * Convert conversationStatusesTempTable raw query partially to Bun * Rename field used to store result of MaxDirectStatusID * Move advanced migrations to their own tiny processor * Catch up util function name with main * Remove json.… wrappers * Remove redundant check * Combine error checks * Replace map with slice of structs * Address processor/type converter comments - Add context info for errors - Extract some common processor code into shared methods - Move conversation eligibility check ahead of populating conversation * Add error context when dropping temp tables
389 lines
9 KiB
Go
389 lines
9 KiB
Go
// GoToSocial
|
|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package stream
|
|
|
|
import (
|
|
"context"
|
|
"maps"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
const (
|
|
// EventTypeNotification -- a user
|
|
// should be shown a notification.
|
|
EventTypeNotification = "notification"
|
|
|
|
// EventTypeUpdate -- a user should
|
|
// be shown an update in their timeline.
|
|
EventTypeUpdate = "update"
|
|
|
|
// EventTypeDelete -- something
|
|
// should be deleted from a user.
|
|
EventTypeDelete = "delete"
|
|
|
|
// EventTypeStatusUpdate -- something in the
|
|
// user's timeline has been edited (yes this
|
|
// is a confusing name, blame Mastodon ...).
|
|
EventTypeStatusUpdate = "status.update"
|
|
|
|
// EventTypeFiltersChanged -- the user's filters
|
|
// (including keywords and statuses) have changed.
|
|
EventTypeFiltersChanged = "filters_changed"
|
|
|
|
// EventTypeConversation -- a user
|
|
// should be shown an updated conversation.
|
|
EventTypeConversation = "conversation"
|
|
)
|
|
|
|
const (
|
|
// TimelineLocal:
|
|
// All public posts originating from this
|
|
// server. Analogous to the local timeline.
|
|
TimelineLocal = "public:local"
|
|
|
|
// TimelinePublic:
|
|
// All public posts known to the server.
|
|
// Analogous to the federated timeline.
|
|
TimelinePublic = "public"
|
|
|
|
// TimelineHome:
|
|
// Events related to the current user, such
|
|
// as home feed updates and notifications.
|
|
TimelineHome = "user"
|
|
|
|
// TimelineNotifications:
|
|
// Notifications for the current user.
|
|
TimelineNotifications = "user:notification"
|
|
|
|
// TimelineDirect:
|
|
// Updates to direct conversations.
|
|
TimelineDirect = "direct"
|
|
|
|
// TimelineList:
|
|
// Updates to a specific list.
|
|
TimelineList = "list"
|
|
)
|
|
|
|
// AllStatusTimelines contains all Timelines
|
|
// that a status could conceivably be delivered
|
|
// to, useful for sending out status deletes.
|
|
var AllStatusTimelines = []string{
|
|
TimelineLocal,
|
|
TimelinePublic,
|
|
TimelineHome,
|
|
TimelineDirect,
|
|
TimelineList,
|
|
}
|
|
|
|
type Streams struct {
|
|
streams map[string][]*Stream
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// Open will open open a new Stream for given account ID and stream types, the given context will be passed to Stream.
|
|
func (s *Streams) Open(accountID string, streamTypes ...string) *Stream {
|
|
if len(streamTypes) == 0 {
|
|
panic("no stream types given")
|
|
}
|
|
|
|
// Prep new Stream.
|
|
str := new(Stream)
|
|
str.done = make(chan struct{})
|
|
str.msgCh = make(chan Message, 50) // TODO: make configurable
|
|
for _, streamType := range streamTypes {
|
|
str.Subscribe(streamType)
|
|
}
|
|
|
|
// TODO: add configurable
|
|
// max streams per account.
|
|
|
|
// Acquire lock.
|
|
s.mutex.Lock()
|
|
|
|
if s.streams == nil {
|
|
// Main stream-map needs allocating.
|
|
s.streams = make(map[string][]*Stream)
|
|
}
|
|
|
|
// Add new stream for account.
|
|
strs := s.streams[accountID]
|
|
strs = append(strs, str)
|
|
s.streams[accountID] = strs
|
|
|
|
// Register close callback
|
|
// to remove stream from our
|
|
// internal map for this account.
|
|
str.close = func() {
|
|
s.mutex.Lock()
|
|
strs := s.streams[accountID]
|
|
strs = slices.DeleteFunc(strs, func(s *Stream) bool {
|
|
return s == str // remove 'str' ptr
|
|
})
|
|
s.streams[accountID] = strs
|
|
s.mutex.Unlock()
|
|
}
|
|
|
|
// Done with lock.
|
|
s.mutex.Unlock()
|
|
|
|
return str
|
|
}
|
|
|
|
// Post will post the given message to all streams of given account ID matching type.
|
|
func (s *Streams) Post(ctx context.Context, accountID string, msg Message) bool {
|
|
var deferred []func() bool
|
|
|
|
// Acquire lock.
|
|
s.mutex.Lock()
|
|
|
|
// Iterate all streams stored for account.
|
|
for _, str := range s.streams[accountID] {
|
|
|
|
// Check whether stream supports any of our message targets.
|
|
if stype := str.getStreamType(msg.Stream...); stype != "" {
|
|
|
|
// Rescope var
|
|
// to prevent
|
|
// ptr reuse.
|
|
stream := str
|
|
|
|
// Use a message copy to *only*
|
|
// include the supported stream.
|
|
msgCopy := Message{
|
|
Stream: []string{stype},
|
|
Event: msg.Event,
|
|
Payload: msg.Payload,
|
|
}
|
|
|
|
// Send message to supported stream
|
|
// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
|
|
// This prevents deadlocks between each
|
|
// msg channel and main Streams{} mutex.
|
|
deferred = append(deferred, func() bool {
|
|
return stream.send(ctx, msgCopy)
|
|
})
|
|
}
|
|
}
|
|
|
|
// Done with lock.
|
|
s.mutex.Unlock()
|
|
|
|
var ok bool
|
|
|
|
// Execute deferred outside lock.
|
|
for _, deferfn := range deferred {
|
|
v := deferfn()
|
|
ok = ok && v
|
|
}
|
|
|
|
return ok
|
|
}
|
|
|
|
// PostAll will post the given message to all streams with matching types.
|
|
func (s *Streams) PostAll(ctx context.Context, msg Message) bool {
|
|
var deferred []func() bool
|
|
|
|
// Acquire lock.
|
|
s.mutex.Lock()
|
|
|
|
// Iterate ALL stored streams.
|
|
for _, strs := range s.streams {
|
|
for _, str := range strs {
|
|
|
|
// Check whether stream supports any of our message targets.
|
|
if stype := str.getStreamType(msg.Stream...); stype != "" {
|
|
|
|
// Rescope var
|
|
// to prevent
|
|
// ptr reuse.
|
|
stream := str
|
|
|
|
// Use a message copy to *only*
|
|
// include the supported stream.
|
|
msgCopy := Message{
|
|
Stream: []string{stype},
|
|
Event: msg.Event,
|
|
Payload: msg.Payload,
|
|
}
|
|
|
|
// Send message to supported stream
|
|
// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
|
|
// This prevents deadlocks between each
|
|
// msg channel and main Streams{} mutex.
|
|
deferred = append(deferred, func() bool {
|
|
return stream.send(ctx, msgCopy)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Done with lock.
|
|
s.mutex.Unlock()
|
|
|
|
var ok bool
|
|
|
|
// Execute deferred outside lock.
|
|
for _, deferfn := range deferred {
|
|
v := deferfn()
|
|
ok = ok && v
|
|
}
|
|
|
|
return ok
|
|
}
|
|
|
|
// Stream represents one
|
|
// open stream for a client.
|
|
type Stream struct {
|
|
|
|
// atomically updated ptr to a read-only copy
|
|
// of supported stream types in a hashmap. this
|
|
// gets updated via CAS operations in .cas().
|
|
types atomic.Pointer[map[string]struct{}]
|
|
|
|
// protects stream close.
|
|
done chan struct{}
|
|
|
|
// inbound msg ch.
|
|
msgCh chan Message
|
|
|
|
// close hook to remove
|
|
// stream from Streams{}.
|
|
close func()
|
|
}
|
|
|
|
// Subscribe will add given type to given types this stream supports.
|
|
func (s *Stream) Subscribe(streamType string) {
|
|
s.cas(func(m map[string]struct{}) bool {
|
|
if _, ok := m[streamType]; ok {
|
|
return false
|
|
}
|
|
m[streamType] = struct{}{}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// Unsubscribe will remove given type (if found) from types this stream supports.
|
|
func (s *Stream) Unsubscribe(streamType string) {
|
|
s.cas(func(m map[string]struct{}) bool {
|
|
if _, ok := m[streamType]; !ok {
|
|
return false
|
|
}
|
|
delete(m, streamType)
|
|
return true
|
|
})
|
|
}
|
|
|
|
// getStreamType returns the first stream type in given list that stream supports.
|
|
func (s *Stream) getStreamType(streamTypes ...string) string {
|
|
if ptr := s.types.Load(); ptr != nil {
|
|
for _, streamType := range streamTypes {
|
|
if _, ok := (*ptr)[streamType]; ok {
|
|
return streamType
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// send will block on posting a new Message{}, returning early with
|
|
// a false value if provided context is canceled, or stream closed.
|
|
func (s *Stream) send(ctx context.Context, msg Message) bool {
|
|
select {
|
|
case <-s.done:
|
|
return false
|
|
case <-ctx.Done():
|
|
return false
|
|
case s.msgCh <- msg:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Recv will block on receiving Message{}, returning early with a
|
|
// false value if provided context is canceled, or stream closed.
|
|
func (s *Stream) Recv(ctx context.Context) (Message, bool) {
|
|
select {
|
|
case <-s.done:
|
|
return Message{}, false
|
|
case <-ctx.Done():
|
|
return Message{}, false
|
|
case msg := <-s.msgCh:
|
|
return msg, true
|
|
}
|
|
}
|
|
|
|
// Close will close the underlying context, finally
|
|
// removing it from the parent Streams per-account-map.
|
|
func (s *Stream) Close() {
|
|
select {
|
|
case <-s.done:
|
|
default:
|
|
close(s.done)
|
|
s.close()
|
|
}
|
|
}
|
|
|
|
// cas will perform a Compare And Swap operation on s.types using modifier func.
|
|
func (s *Stream) cas(fn func(map[string]struct{}) bool) {
|
|
if fn == nil {
|
|
panic("nil function")
|
|
}
|
|
for {
|
|
var m map[string]struct{}
|
|
|
|
// Get current value.
|
|
ptr := s.types.Load()
|
|
|
|
if ptr == nil {
|
|
// Allocate new types map.
|
|
m = make(map[string]struct{})
|
|
} else {
|
|
// Clone r-only map.
|
|
m = maps.Clone(*ptr)
|
|
}
|
|
|
|
// Apply
|
|
// changes.
|
|
if !fn(m) {
|
|
return
|
|
}
|
|
|
|
// Attempt to Compare And Swap ptr.
|
|
if s.types.CompareAndSwap(ptr, &m) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Message represents
|
|
// one streamed message.
|
|
type Message struct {
|
|
|
|
// All the stream types this
|
|
// message should be delivered to.
|
|
Stream []string `json:"stream"`
|
|
|
|
// The event type of the message
|
|
// (update/delete/notification etc)
|
|
Event string `json:"event"`
|
|
|
|
// The actual payload of the message. In case of an
|
|
// update or notification, this will be a JSON string.
|
|
Payload string `json:"payload"`
|
|
}
|