mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-01-25 01:56:42 +01:00
[chore] cleanup storage implementation, no need for multiple interface types (#1131)
Signed-off-by: kim <grufwub@gmail.com> Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
parent
c9d893fec1
commit
fcb9c0bb8b
35 changed files with 129 additions and 178 deletions
|
@ -71,7 +71,7 @@
|
||||||
dbService := testrig.NewTestDB()
|
dbService := testrig.NewTestDB()
|
||||||
testrig.StandardDBSetup(dbService, nil)
|
testrig.StandardDBSetup(dbService, nil)
|
||||||
router := testrig.NewTestRouter(dbService)
|
router := testrig.NewTestRouter(dbService)
|
||||||
var storageBackend storage.Driver
|
var storageBackend *storage.Driver
|
||||||
if os.Getenv("GTS_STORAGE_BACKEND") == "s3" {
|
if os.Getenv("GTS_STORAGE_BACKEND") == "s3" {
|
||||||
storageBackend = testrig.NewS3Storage()
|
storageBackend = testrig.NewS3Storage()
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -45,7 +45,7 @@ type AccountStandardTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
|
|
|
@ -45,7 +45,7 @@ type AdminStandardTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
|
|
|
@ -48,7 +48,7 @@
|
||||||
type AuthStandardTestSuite struct {
|
type AuthStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
|
|
|
@ -48,7 +48,7 @@ type ServeFileTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
type FollowRequestStandardTestSuite struct {
|
type FollowRequestStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
|
|
|
@ -44,7 +44,7 @@ type InstanceStandardTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
|
|
|
@ -54,7 +54,7 @@ type MediaCreateTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage *storage.Local
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
|
|
|
@ -52,7 +52,7 @@ type MediaUpdateTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
|
|
|
@ -43,7 +43,7 @@ type StatusStandardTestSuite struct {
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
emailSender email.Sender
|
emailSender email.Sender
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
|
|
||||||
// standard suite models
|
// standard suite models
|
||||||
testTokens map[string]*gtsmodel.Token
|
testTokens map[string]*gtsmodel.Token
|
||||||
|
|
|
@ -42,7 +42,7 @@ type UserStandardTestSuite struct {
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
emailSender email.Sender
|
emailSender email.Sender
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
|
|
||||||
testTokens map[string]*gtsmodel.Token
|
testTokens map[string]*gtsmodel.Token
|
||||||
testClients map[string]*gtsmodel.Client
|
testClients map[string]*gtsmodel.Client
|
||||||
|
|
|
@ -50,7 +50,7 @@ type EmojiGetTestSuite struct {
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
emailSender email.Sender
|
emailSender email.Sender
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
oauthServer oauth.Server
|
oauthServer oauth.Server
|
||||||
securityModule *security.Module
|
securityModule *security.Module
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ type UserStandardTestSuite struct {
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
emailSender email.Sender
|
emailSender email.Sender
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
oauthServer oauth.Server
|
oauthServer oauth.Server
|
||||||
securityModule *security.Module
|
securityModule *security.Module
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ type WebfingerStandardTestSuite struct {
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
emailSender email.Sender
|
emailSender email.Sender
|
||||||
processor processing.Processor
|
processor processing.Processor
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
oauthServer oauth.Server
|
oauthServer oauth.Server
|
||||||
securityModule *security.Module
|
securityModule *security.Module
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
type DereferencerStandardTestSuite struct {
|
type DereferencerStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
|
|
||||||
testRemoteStatuses map[string]vocab.ActivityStreamsNote
|
testRemoteStatuses map[string]vocab.ActivityStreamsNote
|
||||||
testRemotePeople map[string]vocab.ActivityStreamsPerson
|
testRemotePeople map[string]vocab.ActivityStreamsPerson
|
||||||
|
|
|
@ -31,7 +31,7 @@
|
||||||
type FederatorStandardTestSuite struct {
|
type FederatorStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
testAccounts map[string]*gtsmodel.Account
|
testAccounts map[string]*gtsmodel.Account
|
||||||
testStatuses map[string]*gtsmodel.Status
|
testStatuses map[string]*gtsmodel.Status
|
||||||
|
|
|
@ -100,7 +100,7 @@ type Manager interface {
|
||||||
|
|
||||||
type manager struct {
|
type manager struct {
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
emojiWorker *concurrency.WorkerPool[*ProcessingEmoji]
|
emojiWorker *concurrency.WorkerPool[*ProcessingEmoji]
|
||||||
mediaWorker *concurrency.WorkerPool[*ProcessingMedia]
|
mediaWorker *concurrency.WorkerPool[*ProcessingMedia]
|
||||||
stopCronJobs func() error
|
stopCronJobs func() error
|
||||||
|
@ -112,7 +112,7 @@ type manager struct {
|
||||||
// a limited number of media will be processed in parallel. The numbers of workers
|
// a limited number of media will be processed in parallel. The numbers of workers
|
||||||
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
|
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
|
||||||
// See internal/concurrency.NewWorkerPool() documentation for further information.
|
// See internal/concurrency.NewWorkerPool() documentation for further information.
|
||||||
func NewManager(database db.DB, storage storage.Driver) (Manager, error) {
|
func NewManager(database db.DB, storage *storage.Driver) (Manager, error) {
|
||||||
m := &manager{
|
m := &manager{
|
||||||
db: database,
|
db: database,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
|
|
|
@ -927,14 +927,19 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
|
||||||
temp := fmt.Sprintf("%s/gotosocial-test", os.TempDir())
|
temp := fmt.Sprintf("%s/gotosocial-test", os.TempDir())
|
||||||
defer os.RemoveAll(temp)
|
defer os.RemoveAll(temp)
|
||||||
|
|
||||||
diskStorage, err := kv.OpenDisk(temp, &storage.DiskConfig{
|
disk, err := storage.OpenDisk(temp, &storage.DiskConfig{
|
||||||
LockFile: path.Join(temp, "store.lock"),
|
LockFile: path.Join(temp, "store.lock"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
diskManager, err := media.NewManager(suite.db, >sstorage.Local{KVStore: diskStorage})
|
storage := >sstorage.Driver{
|
||||||
|
KVStore: kv.New(disk),
|
||||||
|
Storage: disk,
|
||||||
|
}
|
||||||
|
|
||||||
|
diskManager, err := media.NewManager(suite.db, storage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -974,7 +979,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
|
||||||
suite.NotNil(dbAttachment)
|
suite.NotNil(dbAttachment)
|
||||||
|
|
||||||
// make sure the processed file is in storage
|
// make sure the processed file is in storage
|
||||||
processedFullBytes, err := diskStorage.Get(ctx, attachment.File.Path)
|
processedFullBytes, err := storage.Get(ctx, attachment.File.Path)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.NotEmpty(processedFullBytes)
|
suite.NotEmpty(processedFullBytes)
|
||||||
|
|
||||||
|
@ -987,7 +992,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
|
||||||
suite.Equal(processedFullBytesExpected, processedFullBytes)
|
suite.Equal(processedFullBytesExpected, processedFullBytes)
|
||||||
|
|
||||||
// now do the same for the thumbnail and make sure it's what we expected
|
// now do the same for the thumbnail and make sure it's what we expected
|
||||||
processedThumbnailBytes, err := diskStorage.Get(ctx, attachment.Thumbnail.Path)
|
processedThumbnailBytes, err := storage.Get(ctx, attachment.Thumbnail.Path)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.NotEmpty(processedThumbnailBytes)
|
suite.NotEmpty(processedThumbnailBytes)
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ type MediaStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
|
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
manager media.Manager
|
manager media.Manager
|
||||||
testAttachments map[string]*gtsmodel.MediaAttachment
|
testAttachments map[string]*gtsmodel.MediaAttachment
|
||||||
testAccounts map[string]*gtsmodel.Account
|
testAccounts map[string]*gtsmodel.Account
|
||||||
|
|
|
@ -68,7 +68,7 @@ type ProcessingEmoji struct {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
database db.DB
|
database db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
|
|
||||||
err error // error created during processing, if any
|
err error // error created during processing, if any
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ type ProcessingMedia struct {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
database db.DB
|
database db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
|
|
||||||
err error // error created during processing, if any
|
err error // error created during processing, if any
|
||||||
|
|
||||||
|
|
|
@ -163,7 +163,7 @@ func (r *lengthReader) Read(b []byte) (int, error) {
|
||||||
// putStream either puts a file with a known fileSize into storage directly, and returns the
|
// putStream either puts a file with a known fileSize into storage directly, and returns the
|
||||||
// fileSize unchanged, or it wraps the reader with a lengthReader and returns the discovered
|
// fileSize unchanged, or it wraps the reader with a lengthReader and returns the discovered
|
||||||
// fileSize.
|
// fileSize.
|
||||||
func putStream(ctx context.Context, storage storage.Driver, key string, r io.Reader, fileSize int64) (int64, error) {
|
func putStream(ctx context.Context, storage *storage.Driver, key string, r io.Reader, fileSize int64) (int64, error) {
|
||||||
if fileSize > 0 {
|
if fileSize > 0 {
|
||||||
return fileSize, storage.PutStream(ctx, key, r)
|
return fileSize, storage.PutStream(ctx, key, r)
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ type AccountStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
oauthServer oauth.Server
|
oauthServer oauth.Server
|
||||||
fromClientAPIChan chan messages.FromClientAPI
|
fromClientAPIChan chan messages.FromClientAPI
|
||||||
|
|
|
@ -51,12 +51,12 @@ type processor struct {
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
transportController transport.Controller
|
transportController transport.Controller
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
db db.DB
|
db db.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new media processor.
|
// New returns a new media processor.
|
||||||
func New(db db.DB, tc typeutils.TypeConverter, mediaManager media.Manager, transportController transport.Controller, storage storage.Driver) Processor {
|
func New(db db.DB, tc typeutils.TypeConverter, mediaManager media.Manager, transportController transport.Controller, storage *storage.Driver) Processor {
|
||||||
return &processor{
|
return &processor{
|
||||||
tc: tc,
|
tc: tc,
|
||||||
mediaManager: mediaManager,
|
mediaManager: mediaManager,
|
||||||
|
|
|
@ -37,7 +37,7 @@ type MediaStandardTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
transportController transport.Controller
|
transportController transport.Controller
|
||||||
|
|
||||||
|
|
|
@ -273,7 +273,7 @@ type processor struct {
|
||||||
tc typeutils.TypeConverter
|
tc typeutils.TypeConverter
|
||||||
oauthServer oauth.Server
|
oauthServer oauth.Server
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
statusTimelines timeline.Manager
|
statusTimelines timeline.Manager
|
||||||
db db.DB
|
db db.DB
|
||||||
filter visibility.Filter
|
filter visibility.Filter
|
||||||
|
@ -297,7 +297,7 @@ func NewProcessor(
|
||||||
federator federation.Federator,
|
federator federation.Federator,
|
||||||
oauthServer oauth.Server,
|
oauthServer oauth.Server,
|
||||||
mediaManager media.Manager,
|
mediaManager media.Manager,
|
||||||
storage storage.Driver,
|
storage *storage.Driver,
|
||||||
db db.DB,
|
db db.DB,
|
||||||
emailSender email.Sender,
|
emailSender email.Sender,
|
||||||
clientWorker *concurrency.WorkerPool[messages.FromClientAPI],
|
clientWorker *concurrency.WorkerPool[messages.FromClientAPI],
|
||||||
|
|
|
@ -39,7 +39,7 @@ type ProcessingStandardTestSuite struct {
|
||||||
// standard suite interfaces
|
// standard suite interfaces
|
||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
typeconverter typeutils.TypeConverter
|
typeconverter typeutils.TypeConverter
|
||||||
httpClient *testrig.MockHTTPClient
|
httpClient *testrig.MockHTTPClient
|
||||||
|
|
|
@ -41,7 +41,7 @@ type StatusStandardTestSuite struct {
|
||||||
db db.DB
|
db db.DB
|
||||||
typeConverter typeutils.TypeConverter
|
typeConverter typeutils.TypeConverter
|
||||||
tc transport.Controller
|
tc transport.Controller
|
||||||
storage storage.Driver
|
storage *storage.Driver
|
||||||
mediaManager media.Manager
|
mediaManager media.Manager
|
||||||
federator federation.Federator
|
federator federation.Federator
|
||||||
clientWorker *concurrency.WorkerPool[messages.FromClientAPI]
|
clientWorker *concurrency.WorkerPool[messages.FromClientAPI]
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
/*
|
|
||||||
GoToSocial
|
|
||||||
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
|
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU Affero General Public License as published by
|
|
||||||
the Free Software Foundation, either version 3 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU Affero General Public License
|
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"net/url"
|
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/v2/kv"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Local struct {
|
|
||||||
*kv.KVStore
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Local) URL(ctx context.Context, key string) *url.URL {
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,50 +0,0 @@
|
||||||
/*
|
|
||||||
GoToSocial
|
|
||||||
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
|
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU Affero General Public License as published by
|
|
||||||
the Free Software Foundation, either version 3 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU Affero General Public License
|
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"mime"
|
|
||||||
"net/url"
|
|
||||||
"path"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/v2/kv"
|
|
||||||
"codeberg.org/gruf/go-store/v2/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
type S3 struct {
|
|
||||||
Proxy bool
|
|
||||||
Bucket string
|
|
||||||
Storage *storage.S3Storage
|
|
||||||
*kv.KVStore
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *S3) URL(ctx context.Context, key string) *url.URL {
|
|
||||||
if s.Proxy {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// it's safe to ignore the error here, as we just fall back to fetching the file if URL request fails
|
|
||||||
url, _ := s.Storage.Client().PresignedGetObject(ctx, s.Bucket, key, time.Hour, url.Values{
|
|
||||||
"response-content-type": []string{mime.TypeByExtension(path.Ext(key))},
|
|
||||||
})
|
|
||||||
|
|
||||||
return url
|
|
||||||
}
|
|
|
@ -20,11 +20,11 @@
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"mime"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/v2/kv"
|
"codeberg.org/gruf/go-store/v2/kv"
|
||||||
"codeberg.org/gruf/go-store/v2/storage"
|
"codeberg.org/gruf/go-store/v2/storage"
|
||||||
|
@ -33,32 +33,50 @@
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/config"
|
"github.com/superseriousbusiness/gotosocial/internal/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
// ErrAlreadyExists is a ptr to underlying storage.ErrAlreadyExists,
|
||||||
ErrNotSupported = errors.New("driver does not suppport functionality")
|
// to put the related errors in the same package as our storage wrapper.
|
||||||
ErrAlreadyExists = storage.ErrAlreadyExists
|
var ErrAlreadyExists = storage.ErrAlreadyExists
|
||||||
)
|
|
||||||
|
|
||||||
// Driver implements the functionality to store and retrieve blobs
|
// Driver wraps a kv.KVStore to also provide S3 presigned GET URLs.
|
||||||
// (images,video,audio)
|
type Driver struct {
|
||||||
type Driver interface {
|
// Underlying storage
|
||||||
Get(ctx context.Context, key string) ([]byte, error)
|
*kv.KVStore
|
||||||
GetStream(ctx context.Context, key string) (io.ReadCloser, error)
|
Storage storage.Storage
|
||||||
PutStream(ctx context.Context, key string, r io.Reader) error
|
|
||||||
Put(ctx context.Context, key string, value []byte) error
|
// S3-only parameters
|
||||||
Delete(ctx context.Context, key string) error
|
Proxy bool
|
||||||
URL(ctx context.Context, key string) *url.URL
|
Bucket string
|
||||||
}
|
}
|
||||||
|
|
||||||
func AutoConfig() (Driver, error) {
|
// URL will return a presigned GET object URL, but only if running on S3 storage with proxying disabled.
|
||||||
switch config.GetStorageBackend() {
|
func (d *Driver) URL(ctx context.Context, key string) *url.URL {
|
||||||
|
// Check whether S3 *without* proxying is enabled
|
||||||
|
s3, ok := d.Storage.(*storage.S3Storage)
|
||||||
|
if !ok || d.Proxy {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If URL request fails, fallback is to fetch the file. So ignore the error here
|
||||||
|
url, _ := s3.Client().PresignedGetObject(ctx, d.Bucket, key, time.Hour, url.Values{
|
||||||
|
"response-content-type": []string{mime.TypeByExtension(path.Ext(key))},
|
||||||
|
})
|
||||||
|
|
||||||
|
return url
|
||||||
|
}
|
||||||
|
|
||||||
|
func AutoConfig() (*Driver, error) {
|
||||||
|
var st storage.Storage
|
||||||
|
|
||||||
|
switch backend := config.GetStorageBackend(); backend {
|
||||||
case "s3":
|
case "s3":
|
||||||
|
// Load runtime configuration
|
||||||
endpoint := config.GetStorageS3Endpoint()
|
endpoint := config.GetStorageS3Endpoint()
|
||||||
access := config.GetStorageS3AccessKey()
|
access := config.GetStorageS3AccessKey()
|
||||||
secret := config.GetStorageS3SecretKey()
|
secret := config.GetStorageS3SecretKey()
|
||||||
secure := config.GetStorageS3UseSSL()
|
secure := config.GetStorageS3UseSSL()
|
||||||
bucket := config.GetStorageS3BucketName()
|
bucket := config.GetStorageS3BucketName()
|
||||||
proxy := config.GetStorageS3Proxy()
|
|
||||||
|
|
||||||
|
// Open the s3 storage implementation
|
||||||
s3, err := storage.OpenS3(endpoint, bucket, &storage.S3Config{
|
s3, err := storage.OpenS3(endpoint, bucket, &storage.S3Config{
|
||||||
CoreOpts: minio.Options{
|
CoreOpts: minio.Options{
|
||||||
Creds: credentials.NewStaticV4(access, secret, ""),
|
Creds: credentials.NewStaticV4(access, secret, ""),
|
||||||
|
@ -75,15 +93,14 @@ func AutoConfig() (Driver, error) {
|
||||||
return nil, fmt.Errorf("error opening s3 storage: %w", err)
|
return nil, fmt.Errorf("error opening s3 storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &S3{
|
// Set storage impl
|
||||||
Proxy: proxy,
|
st = s3
|
||||||
Bucket: bucket,
|
|
||||||
Storage: s3,
|
|
||||||
KVStore: kv.New(s3),
|
|
||||||
}, nil
|
|
||||||
case "local":
|
case "local":
|
||||||
|
// Load runtime configuration
|
||||||
basePath := config.GetStorageLocalBasePath()
|
basePath := config.GetStorageLocalBasePath()
|
||||||
|
|
||||||
|
// Open the disk storage implementation
|
||||||
disk, err := storage.OpenDisk(basePath, &storage.DiskConfig{
|
disk, err := storage.OpenDisk(basePath, &storage.DiskConfig{
|
||||||
// Put the store lockfile in the storage dir itself.
|
// Put the store lockfile in the storage dir itself.
|
||||||
// Normally this would not be safe, since we could end up
|
// Normally this would not be safe, since we could end up
|
||||||
|
@ -96,7 +113,17 @@ func AutoConfig() (Driver, error) {
|
||||||
return nil, fmt.Errorf("error opening disk storage: %w", err)
|
return nil, fmt.Errorf("error opening disk storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Local{kv.New(disk)}, nil
|
// Set storage impl
|
||||||
|
st = disk
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid storage backend: %s", backend)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("invalid storage backend %s", config.GetStorageBackend())
|
|
||||||
|
return &Driver{
|
||||||
|
KVStore: kv.New(st),
|
||||||
|
Proxy: config.GetStorageS3Proxy(),
|
||||||
|
Bucket: config.GetStorageS3BucketName(),
|
||||||
|
Storage: st,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,6 @@
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTestFederator returns a federator with the given database and (mock!!) transport controller.
|
// NewTestFederator returns a federator with the given database and (mock!!) transport controller.
|
||||||
func NewTestFederator(db db.DB, tc transport.Controller, storage storage.Driver, mediaManager media.Manager, fedWorker *concurrency.WorkerPool[messages.FromFederator]) federation.Federator {
|
func NewTestFederator(db db.DB, tc transport.Controller, storage *storage.Driver, mediaManager media.Manager, fedWorker *concurrency.WorkerPool[messages.FromFederator]) federation.Federator {
|
||||||
return federation.NewFederator(db, NewTestFederatingDB(db, fedWorker), tc, NewTestTypeConverter(db), mediaManager)
|
return federation.NewFederator(db, NewTestFederatingDB(db, fedWorker), tc, NewTestTypeConverter(db), mediaManager)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTestMediaManager returns a media handler with the default test config, and the given db and storage.
|
// NewTestMediaManager returns a media handler with the default test config, and the given db and storage.
|
||||||
func NewTestMediaManager(db db.DB, storage storage.Driver) media.Manager {
|
func NewTestMediaManager(db db.DB, storage *storage.Driver) media.Manager {
|
||||||
m, err := media.NewManager(db, storage)
|
m, err := media.NewManager(db, storage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
|
@ -30,6 +30,6 @@
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTestProcessor returns a Processor suitable for testing purposes
|
// NewTestProcessor returns a Processor suitable for testing purposes
|
||||||
func NewTestProcessor(db db.DB, storage storage.Driver, federator federation.Federator, emailSender email.Sender, mediaManager media.Manager, clientWorker *concurrency.WorkerPool[messages.FromClientAPI], fedWorker *concurrency.WorkerPool[messages.FromFederator]) processing.Processor {
|
func NewTestProcessor(db db.DB, storage *storage.Driver, federator federation.Federator, emailSender email.Sender, mediaManager media.Manager, clientWorker *concurrency.WorkerPool[messages.FromClientAPI], fedWorker *concurrency.WorkerPool[messages.FromFederator]) processing.Processor {
|
||||||
return processing.NewProcessor(NewTestTypeConverter(db), federator, NewTestOauthServer(db), mediaManager, storage, db, emailSender, clientWorker, fedWorker)
|
return processing.NewProcessor(NewTestTypeConverter(db), federator, NewTestOauthServer(db), mediaManager, storage, db, emailSender, clientWorker, fedWorker)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,15 +33,15 @@
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewInMemoryStorage returns a new in memory storage with the default test config
|
// NewInMemoryStorage returns a new in memory storage with the default test config
|
||||||
func NewInMemoryStorage() *gtsstorage.Local {
|
func NewInMemoryStorage() *gtsstorage.Driver {
|
||||||
storage, err := kv.OpenStorage(storage.OpenMemory(200, false))
|
storage := storage.OpenMemory(200, false)
|
||||||
if err != nil {
|
return >sstorage.Driver{
|
||||||
panic(err)
|
KVStore: kv.New(storage),
|
||||||
|
Storage: storage,
|
||||||
}
|
}
|
||||||
return >sstorage.Local{KVStore: storage}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS3Storage() gtsstorage.Driver {
|
func NewS3Storage() *gtsstorage.Driver {
|
||||||
endpoint := config.GetStorageS3Endpoint()
|
endpoint := config.GetStorageS3Endpoint()
|
||||||
access := config.GetStorageS3AccessKey()
|
access := config.GetStorageS3AccessKey()
|
||||||
secret := config.GetStorageS3SecretKey()
|
secret := config.GetStorageS3SecretKey()
|
||||||
|
@ -65,16 +65,16 @@ func NewS3Storage() gtsstorage.Driver {
|
||||||
panic(fmt.Errorf("error opening s3 storage: %w", err))
|
panic(fmt.Errorf("error opening s3 storage: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return >sstorage.S3{
|
return >sstorage.Driver{
|
||||||
|
KVStore: kv.New(s3),
|
||||||
|
Storage: s3,
|
||||||
Proxy: proxy,
|
Proxy: proxy,
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Storage: s3,
|
|
||||||
KVStore: kv.New(s3),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StandardStorageSetup populates the storage with standard test entries from the given directory.
|
// StandardStorageSetup populates the storage with standard test entries from the given directory.
|
||||||
func StandardStorageSetup(s gtsstorage.Driver, relativePath string) {
|
func StandardStorageSetup(storage *gtsstorage.Driver, relativePath string) {
|
||||||
storedA := newTestStoredAttachments()
|
storedA := newTestStoredAttachments()
|
||||||
a := NewTestAttachments()
|
a := NewTestAttachments()
|
||||||
for k, paths := range storedA {
|
for k, paths := range storedA {
|
||||||
|
@ -90,14 +90,14 @@ func StandardStorageSetup(s gtsstorage.Driver, relativePath string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if err := s.Put(context.TODO(), pathOriginal, bOriginal); err != nil {
|
if err := storage.Put(context.TODO(), pathOriginal, bOriginal); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
bSmall, err := os.ReadFile(fmt.Sprintf("%s/%s", relativePath, filenameSmall))
|
bSmall, err := os.ReadFile(fmt.Sprintf("%s/%s", relativePath, filenameSmall))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if err := s.Put(context.TODO(), pathSmall, bSmall); err != nil {
|
if err := storage.Put(context.TODO(), pathSmall, bSmall); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -117,14 +117,14 @@ func StandardStorageSetup(s gtsstorage.Driver, relativePath string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if err := s.Put(context.TODO(), pathOriginal, bOriginal); err != nil {
|
if err := storage.Put(context.TODO(), pathOriginal, bOriginal); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
bStatic, err := os.ReadFile(fmt.Sprintf("%s/%s", relativePath, filenameStatic))
|
bStatic, err := os.ReadFile(fmt.Sprintf("%s/%s", relativePath, filenameStatic))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if err := s.Put(context.TODO(), pathStatic, bStatic); err != nil {
|
if err := storage.Put(context.TODO(), pathStatic, bStatic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,24 +133,27 @@ func StandardStorageSetup(s gtsstorage.Driver, relativePath string) {
|
||||||
// StandardStorageTeardown deletes everything in storage so that it's clean for
|
// StandardStorageTeardown deletes everything in storage so that it's clean for
|
||||||
// the next test
|
// the next test
|
||||||
// nolint:gocritic // complains about the type switch, but it's the cleanest solution
|
// nolint:gocritic // complains about the type switch, but it's the cleanest solution
|
||||||
func StandardStorageTeardown(s gtsstorage.Driver) {
|
func StandardStorageTeardown(storage *gtsstorage.Driver) {
|
||||||
defer os.RemoveAll(path.Join(os.TempDir(), "gotosocial"))
|
defer os.RemoveAll(path.Join(os.TempDir(), "gotosocial"))
|
||||||
|
|
||||||
switch st := s.(type) {
|
// Open a storage iterator
|
||||||
case *gtsstorage.Local:
|
iter, err := storage.Iterator(context.Background(), nil)
|
||||||
iter, err := st.KVStore.Iterator(context.Background(), nil)
|
if err != nil {
|
||||||
if err != nil {
|
panic(err)
|
||||||
panic(err)
|
}
|
||||||
}
|
|
||||||
keys := []string{}
|
var keys []string
|
||||||
for iter.Next() {
|
|
||||||
keys = append(keys, iter.Key())
|
for iter.Next() {
|
||||||
}
|
// Collate all of the storage keys
|
||||||
iter.Release()
|
keys = append(keys, iter.Key())
|
||||||
for _, k := range keys {
|
}
|
||||||
if err := s.Delete(context.TODO(), k); err != nil {
|
|
||||||
panic(err)
|
// Done with iter
|
||||||
}
|
iter.Release()
|
||||||
}
|
|
||||||
|
for _, key := range keys {
|
||||||
|
// Ignore errors, we just want to attempt delete all
|
||||||
|
_ = storage.Delete(context.Background(), key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue