diff --git a/go.mod b/go.mod index 79311b18b..38c1b1fa9 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( codeberg.org/gruf/go-store/v2 v2.0.7 github.com/buckket/go-blurhash v1.1.0 github.com/coreos/go-oidc/v3 v3.4.0 + github.com/cornelk/hashmap v1.0.8 github.com/disintegration/imaging v1.6.2 github.com/gin-contrib/cors v1.4.0 github.com/gin-contrib/gzip v0.0.6 @@ -67,7 +68,6 @@ require ( codeberg.org/gruf/go-pools v1.1.0 // indirect codeberg.org/gruf/go-sched v1.1.1 // indirect github.com/aymerick/douceur v0.2.0 // indirect - github.com/cornelk/hashmap v1.0.8 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dsoprea/go-exif/v3 v3.0.0-20210625224831-a6301f85c82b // indirect github.com/dsoprea/go-iptc v0.0.0-20200610044640-bc9ca208b413 // indirect diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index 279a0c3c1..377b9e041 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -26,6 +26,7 @@ "reflect" "runtime" + "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-runners" "github.com/superseriousbusiness/gotosocial/internal/log" ) @@ -35,7 +36,7 @@ type WorkerPool[MsgType any] struct { workers runners.WorkerPool process func(context.Context, MsgType) error nw, nq int - prefix string // contains type prefix for logging + wtype string // contains worker type for logging } // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, @@ -61,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType process: nil, nw: workers, nq: workers * queueRatio, - prefix: fmt.Sprintf("worker.Worker[%s]", msgType), + wtype: fmt.Sprintf("worker.Worker[%s]", msgType), } - // Log new worker creation with type prefix + // Log new worker creation with worker type prefix log.Infof("%s created with workers=%d queue=%d", - w.prefix, + w.wtype, workers, workers*queueRatio, ) @@ -76,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType // Start will attempt to start the underlying worker pool, or return error. func (w *WorkerPool[MsgType]) Start() error { - log.Infof("%s starting", w.prefix) + log.Infof("%s starting", w.wtype) // Check processor was set if w.process == nil { @@ -93,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error { // Stop will attempt to stop the underlying worker pool, or return error. func (w *WorkerPool[MsgType]) Stop() error { - log.Infof("%s stopping", w.prefix) + log.Infof("%s stopping", w.wtype) // Attempt to stop pool if !w.workers.Stop() { @@ -106,19 +107,34 @@ func (w *WorkerPool[MsgType]) Stop() error { // SetProcessor will set the Worker's processor function, which is called for each queued message. func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { if w.process != nil { - log.Fatalf("%s Worker.process is already set", w.prefix) + log.Panicf("%s Worker.process is already set", w.wtype) } w.process = fn } // Queue will queue provided message to be processed with there's a free worker. func (w *WorkerPool[MsgType]) Queue(msg MsgType) { - log.Tracef("%s queueing message (queue=%d): %+v", - w.prefix, w.workers.Queue(), msg, - ) - w.workers.Enqueue(func(ctx context.Context) { + log.Tracef("%s queueing message: %+v", w.wtype, msg) + + // Create new process function for msg + process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { - log.Errorf("%s %v", w.prefix, err) + log.WithFields(kv.Fields{ + kv.Field{K: "type", V: w.wtype}, + kv.Field{K: "error", V: err}, + }...).Error("message processing error") } - }) + } + + // Attempt a fast-enqueue of process + if !w.workers.EnqueueNow(process) { + // No spot acquired, log warning + log.WithFields(kv.Fields{ + kv.Field{K: "type", V: w.wtype}, + kv.Field{K: "queue", V: w.workers.Queue()}, + }...).Warn("full worker queue") + + // Block on enqueuing process func + w.workers.Enqueue(process) + } } diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 7aa0cd8ea..8792e5b82 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -26,6 +26,11 @@ "net/netip" "runtime" "time" + + "codeberg.org/gruf/go-bytesize" + "codeberg.org/gruf/go-kv" + "github.com/cornelk/hashmap" + "github.com/superseriousbusiness/gotosocial/internal/log" ) // ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. @@ -42,8 +47,8 @@ // configuration values passed to initialized http.Transport{} // and http.Client{}, along with httpclient.Client{} specific. type Config struct { - // MaxOpenConns limits the max number of concurrent open connections. - MaxOpenConns int + // MaxOpenConnsPerHost limits the max number of open connections to a host. + MaxOpenConnsPerHost int // MaxIdleConns: see http.Transport{}.MaxIdleConns. MaxIdleConns int @@ -80,8 +85,9 @@ type Config struct { // is available (context channels still respected) type Client struct { client http.Client - rc *requestQueue - bmax int64 + queue *hashmap.Map[string, chan struct{}] + bmax int64 // max response body size + cmax int // max open conns per host } // New returns a new instance of Client initialized using configuration. @@ -94,20 +100,20 @@ func New(cfg Config) *Client { Resolver: &net.Resolver{}, } - if cfg.MaxOpenConns <= 0 { + if cfg.MaxOpenConnsPerHost <= 0 { // By default base this value on GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - cfg.MaxOpenConns = maxprocs * 10 + cfg.MaxOpenConnsPerHost = maxprocs * 20 } if cfg.MaxIdleConns <= 0 { // By default base this value on MaxOpenConns - cfg.MaxIdleConns = cfg.MaxOpenConns * 10 + cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10 } if cfg.MaxBodySize <= 0 { // By default set this to a reasonable 40MB - cfg.MaxBodySize = 40 * 1024 * 1024 + cfg.MaxBodySize = int64(40 * bytesize.MiB) } // Protect dialer with IP range sanitizer @@ -117,11 +123,10 @@ func New(cfg Config) *Client { }).Sanitize // Prepare client fields - c.bmax = cfg.MaxBodySize - c.rc = &requestQueue{ - maxOpenConns: cfg.MaxOpenConns, - } c.client.Timeout = cfg.Timeout + c.cmax = cfg.MaxOpenConnsPerHost + c.bmax = cfg.MaxBodySize + c.queue = hashmap.New[string, chan struct{}]() // Set underlying HTTP client roundtripper c.client.Transport = &http.Transport{ @@ -145,17 +150,16 @@ func New(cfg Config) *Client { // as the standard http.Client{}.Do() implementation except that response body will // be wrapped by an io.LimitReader() to limit response body sizes. func (c *Client) Do(req *http.Request) (*http.Response, error) { - // request a spot in the wait queue... - wait, release := c.rc.getWaitSpot(req.Host, req.Method) + // Get host's wait queue + wait := c.wait(req.Host) + + var ok bool - // ... and wait our turn select { - case <-req.Context().Done(): - // the request was canceled before we - // got to our turn: no need to release - return nil, req.Context().Err() + // Quickly try grab a spot case wait <- struct{}{}: // it's our turn! + ok = true // NOTE: // Ideally here we would set the slot release to happen either @@ -167,7 +171,27 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // that connections may not be closed until response body is closed. // The current implementation will reduce the viability of denial of // service attacks, but if there are future issues heed this advice :] - defer release() + defer func() { <-wait }() + default: + } + + if !ok { + // No spot acquired, log warning + log.WithFields(kv.Fields{ + {K: "queue", V: len(wait)}, + {K: "method", V: req.Method}, + {K: "host", V: req.Host}, + {K: "uri", V: req.URL.RequestURI()}, + }...).Warn("full request queue") + + select { + case <-req.Context().Done(): + // the request was canceled before we + // got to our turn: no need to release + return nil, req.Context().Err() + case wait <- struct{}{}: + defer func() { <-wait }() + } } // Firstly, ensure this is a valid request @@ -208,3 +232,17 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { return rsp, nil } + +// wait acquires the 'wait' queue for the given host string, or allocates new. +func (c *Client) wait(host string) chan struct{} { + // Look for an existing queue + queue, ok := c.queue.Get(host) + if ok { + return queue + } + + // Allocate a new host queue (or return a sneaky existing one). + queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax)) + + return queue +} diff --git a/internal/httpclient/queue.go b/internal/httpclient/queue.go deleted file mode 100644 index 8cb1274be..000000000 --- a/internal/httpclient/queue.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package httpclient - -import ( - "strings" - "sync" - - "github.com/superseriousbusiness/gotosocial/internal/log" -) - -type requestQueue struct { - hostQueues sync.Map // map of `hostQueue` - maxOpenConns int // max open conns per host per request method -} - -type hostQueue struct { - slotsByMethod sync.Map -} - -// getWaitSpot returns a wait channel and release function for http clients -// that want to do requests politely: that is, wait for their turn. -// -// To wait, a caller should do a select on an attempted insert into the -// returned wait channel. Once the insert succeeds, then the caller should -// proceed with the http request that pertains to the given host + method. -// It doesn't matter what's put into the wait channel, just any interface{}. -// -// When the caller is finished with their http request, they should free up the -// slot they were occupying in the wait queue, by calling the release function. -// -// The reason for the caller needing to provide host and method, is that each -// remote host has a separate wait queue, and there's a separate wait queue -// per method for that host as well. This ensures that outgoing requests can still -// proceed for others hosts and methods while other requests are undergoing, -// while also preventing one host from being spammed with, for example, a -// shitload of GET requests all at once. -func (rc *requestQueue) getWaitSpot(host string, method string) (wait chan<- interface{}, release func()) { - hostQueueI, _ := rc.hostQueues.LoadOrStore(host, new(hostQueue)) - hostQueue, ok := hostQueueI.(*hostQueue) - if !ok { - log.Panic("hostQueueI was not a *hostQueue") - } - - waitSlotI, _ := hostQueue.slotsByMethod.LoadOrStore(strings.ToUpper(method), make(chan interface{}, rc.maxOpenConns)) - methodQueue, ok := waitSlotI.(chan interface{}) - if !ok { - log.Panic("waitSlotI was not a chan interface{}") - } - - return methodQueue, func() { <-methodQueue } -} diff --git a/internal/httpclient/queue_test.go b/internal/httpclient/queue_test.go deleted file mode 100644 index c6d6ad324..000000000 --- a/internal/httpclient/queue_test.go +++ /dev/null @@ -1,106 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package httpclient - -import ( - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/suite" -) - -type QueueTestSuite struct { - suite.Suite -} - -func (suite *QueueTestSuite) TestQueue() { - maxOpenConns := 5 - waitTimeout := 1 * time.Second - - rc := &requestQueue{ - maxOpenConns: maxOpenConns, - } - - // fill all the open connections - var release func() - for i, n := range make([]interface{}, maxOpenConns) { - w, r := rc.getWaitSpot("example.org", http.MethodPost) - w <- n - if i == maxOpenConns-1 { - // save the last release function - release = r - } - } - - // try to wait again for the same host/method combo, it should timeout - waitAgain, _ := rc.getWaitSpot("example.org", "post") - - select { - case waitAgain <- struct{}{}: - suite.FailNow("first wait did not time out") - case <-time.After(waitTimeout): - break - } - - // now close the final release that we derived earlier - release() - - // try waiting again, it should work this time - select { - case waitAgain <- struct{}{}: - break - case <-time.After(waitTimeout): - suite.FailNow("second wait timed out") - } - - // the POST queue is now sitting on full - suite.Len(waitAgain, maxOpenConns) - - // we should still be able to make a GET for the same host though - getWait, getRelease := rc.getWaitSpot("example.org", http.MethodGet) - select { - case getWait <- struct{}{}: - break - case <-time.After(waitTimeout): - suite.FailNow("get wait timed out") - } - - // the GET queue has one request waiting - suite.Len(getWait, 1) - // clear it... - getRelease() - suite.Empty(getWait) - - // even though the POST queue for example.org is full, we - // should still be able to make a POST request to another host :) - waitForAnotherHost, _ := rc.getWaitSpot("somewhere.else", http.MethodPost) - select { - case waitForAnotherHost <- struct{}{}: - break - case <-time.After(waitTimeout): - suite.FailNow("get wait timed out") - } - - suite.Len(waitForAnotherHost, 1) -} - -func TestQueueTestSuite(t *testing.T) { - suite.Run(t, &QueueTestSuite{}) -}