[chore/performance] Batch migration queries (#3798)

* separate enum migrations into their own individual transactions

* pee poo

* some performance tweaks and adding more comments

* batch

---------

Co-authored-by: kim <grufwub@gmail.com>
This commit is contained in:
tobi 2025-02-15 12:43:12 +01:00 committed by GitHub
parent ebbdeee0bb
commit 5dc8009e30
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 179 additions and 99 deletions

View file

@ -30,107 +30,118 @@
func init() { func init() {
up := func(ctx context.Context, db *bun.DB) error { up := func(ctx context.Context, db *bun.DB) error {
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { // Status visibility type indices.
var statusVisIndices = []struct {
name string
cols []string
order string
}{
{
name: "statuses_visibility_idx",
cols: []string{"visibility"},
order: "",
},
{
name: "statuses_profile_web_view_idx",
cols: []string{"account_id", "visibility"},
order: "id DESC",
},
{
name: "statuses_public_timeline_idx",
cols: []string{"visibility"},
order: "id DESC",
},
}
// Status visibility type indices. // Tables with visibility types.
var statusVisIndices = []struct { var visTables = []struct {
name string Table string
cols []string Column string
order string Default *new_gtsmodel.Visibility
}{ IndexCleanupCallback func(ctx context.Context, tx bun.Tx) error
{ BatchByColumn string
name: "statuses_visibility_idx", }{
cols: []string{"visibility"}, {
order: "", Table: "statuses",
}, Column: "visibility",
{ IndexCleanupCallback: func(ctx context.Context, tx bun.Tx) error {
name: "statuses_profile_web_view_idx", // After new column has been created and
cols: []string{"account_id", "visibility"}, // populated, drop indices relying on old column.
order: "id DESC", for _, index := range statusVisIndices {
}, log.Infof(ctx, "dropping old index %s...", index.name)
{ if _, err := tx.NewDropIndex().
name: "statuses_public_timeline_idx", Index(index.name).
cols: []string{"visibility"}, Exec(ctx); err != nil {
order: "id DESC", return err
},
}
// Tables with visibility types.
var visTables = []struct {
Table string
Column string
Default *new_gtsmodel.Visibility
IndexCleanupCallback func(ctx context.Context, tx bun.Tx) error
}{
{
Table: "statuses",
Column: "visibility",
IndexCleanupCallback: func(ctx context.Context, tx bun.Tx) error {
// After new column has been created and
// populated, drop indices relying on old column.
for _, index := range statusVisIndices {
log.Infof(ctx, "dropping old index %s...", index.name)
if _, err := tx.NewDropIndex().
Index(index.name).
Exec(ctx); err != nil {
return err
}
} }
return nil }
}, return nil
}, },
{ BatchByColumn: "id",
Table: "sin_bin_statuses", },
Column: "visibility", {
}, Table: "sin_bin_statuses",
{ Column: "visibility",
Table: "account_settings", BatchByColumn: "id",
Column: "privacy", },
Default: util.Ptr(new_gtsmodel.VisibilityDefault)}, {
{ Table: "account_settings",
Table: "account_settings", Column: "privacy",
Column: "web_visibility", Default: util.Ptr(new_gtsmodel.VisibilityDefault),
Default: util.Ptr(new_gtsmodel.VisibilityDefault)}, BatchByColumn: "account_id",
} },
// Get the mapping of old enum string values to new integer values. {
visibilityMapping := visibilityEnumMapping[old_gtsmodel.Visibility]() Table: "account_settings",
Column: "web_visibility",
Default: util.Ptr(new_gtsmodel.VisibilityDefault),
BatchByColumn: "account_id",
},
}
// Convert all visibility tables. // Get the mapping of old enum string values to new integer values.
for _, table := range visTables { visibilityMapping := visibilityEnumMapping[old_gtsmodel.Visibility]()
if err := convertEnums(ctx, tx, table.Table, table.Column,
visibilityMapping, table.Default, table.IndexCleanupCallback); err != nil {
return err
}
}
// Recreate the visibility indices. // Convert all visibility tables.
log.Info(ctx, "creating new visibility indexes...") for _, table := range visTables {
for _, index := range statusVisIndices {
log.Infof(ctx, "creating new index %s...", index.name)
q := tx.NewCreateIndex().
Table("statuses").
Index(index.name).
Column(index.cols...)
if index.order != "" {
q = q.ColumnExpr(index.order)
}
if _, err := q.Exec(ctx); err != nil {
return err
}
}
// Get the mapping of old enum string values to the new integer value types. // Perform each enum table conversion within its own transaction.
notificationMapping := notificationEnumMapping[old_gtsmodel.NotificationType]() if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
return convertEnums(ctx, tx, table.Table, table.Column,
// Migrate over old notifications table column over to new column type. visibilityMapping, table.Default, table.IndexCleanupCallback, table.BatchByColumn)
if err := convertEnums(ctx, tx, "notifications", "notification_type", //nolint:revive }); err != nil {
notificationMapping, nil, nil); err != nil {
return err return err
} }
}
return nil // Recreate the visibility indices.
}) log.Info(ctx, "creating new visibility indexes...")
for _, index := range statusVisIndices {
log.Infof(ctx, "creating new index %s...", index.name)
q := db.NewCreateIndex().
Table("statuses").
Index(index.name).
Column(index.cols...)
if index.order != "" {
q = q.ColumnExpr(index.order)
}
if _, err := q.Exec(ctx); err != nil {
return err
}
}
// Get the mapping of old enum string values to the new integer value types.
notificationMapping := notificationEnumMapping[old_gtsmodel.NotificationType]()
// Migrate over old notifications table column to new type in tx.
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
return convertEnums(ctx, tx, "notifications", "notification_type", //nolint:revive
notificationMapping, nil, nil, "id")
}); err != nil {
return err
}
return nil
} }
down := func(ctx context.Context, db *bun.DB) error { down := func(ctx context.Context, db *bun.DB) error {

View file

@ -22,11 +22,13 @@
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"slices"
"strconv" "strconv"
"strings" "strings"
"codeberg.org/gruf/go-byteutil" "codeberg.org/gruf/go-byteutil"
"github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/uptrace/bun" "github.com/uptrace/bun"
"github.com/uptrace/bun/dialect" "github.com/uptrace/bun/dialect"
@ -46,6 +48,7 @@ func convertEnums[OldType ~string, NewType ~int16](
mapping map[OldType]NewType, mapping map[OldType]NewType,
defaultValue *NewType, defaultValue *NewType,
indexCleanupCallback func(context.Context, bun.Tx) error, indexCleanupCallback func(context.Context, bun.Tx) error,
batchByColumn string,
) error { ) error {
if len(mapping) == 0 { if len(mapping) == 0 {
return errors.New("empty mapping") return errors.New("empty mapping")
@ -87,7 +90,7 @@ func convertEnums[OldType ~string, NewType ~int16](
var qbuf byteutil.Buffer var qbuf byteutil.Buffer
// Prepare a singular UPDATE statement using // Prepare a singular UPDATE statement using
// SET $newColumn = (CASE $column WHEN $old THEN $new ... END) // SET $newColumn = (CASE $column WHEN $old THEN $new ... END).
qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...) qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...)
args = append(args, bun.Ident(table)) args = append(args, bun.Ident(table))
args = append(args, bun.Ident(newColumn)) args = append(args, bun.Ident(newColumn))
@ -99,16 +102,82 @@ func convertEnums[OldType ~string, NewType ~int16](
qbuf.B = append(qbuf.B, "ELSE ? END)"...) qbuf.B = append(qbuf.B, "ELSE ? END)"...)
args = append(args, *defaultValue) args = append(args, *defaultValue)
// Execute the prepared raw query with arguments. // Serialize it here to be
res, err := tx.NewRaw(qbuf.String(), args...).Exec(ctx) // used as the base for each
if err != nil { // set of batch queries below.
return gtserror.Newf("error updating old column values: %w", err) baseQStr := string(qbuf.B)
baseArgs := args
// Query batch size
// in number of rows.
const batchsz = 5000
// Stores highest batch value
// used in iterate queries,
// starting at highest possible.
highest := id.Highest
// Total updated rows.
var updated int
for {
// Limit to batchsz
// items at once.
batchQ := tx.
NewSelect().
Table(table).
Column(batchByColumn).
Where("? < ?", bun.Ident(batchByColumn), highest).
OrderExpr("? DESC", bun.Ident(batchByColumn)).
Limit(batchsz)
// Finalize UPDATE to operate on this batch only.
qStr := baseQStr + " WHERE ? IN (?)"
args := append(
slices.Clone(baseArgs),
bun.Ident(batchByColumn),
batchQ,
)
// Execute the prepared raw query with arguments.
res, err := tx.NewRaw(qStr, args...).Exec(ctx)
if err != nil {
return gtserror.Newf("error updating old column values: %w", err)
}
// Check how many items we updated.
thisUpdated, err := res.RowsAffected()
if err != nil {
return gtserror.Newf("error counting affected rows: %w", err)
}
if thisUpdated == 0 {
// Nothing updated
// means we're done.
break
}
// Update the overall count.
updated += int(thisUpdated)
// Log helpful message to admin.
log.Infof(ctx, "migrated %d of %d %s (up to %s)",
updated, total, table, highest)
// Get next highest
// id for next batch.
if err := tx.
NewSelect().
With("batch_query", batchQ).
ColumnExpr("min(?) FROM ?", bun.Ident(batchByColumn), bun.Ident("batch_query")).
Scan(ctx, &highest); err != nil {
return gtserror.Newf("error selecting next highest: %w", err)
}
} }
// Count number items updated.
updated, _ := res.RowsAffected()
if total != int(updated) { if total != int(updated) {
log.Warnf(ctx, "total=%d does not match updated=%d", total, updated) // Return error here in order to rollback the whole transaction.
return fmt.Errorf("total=%d does not match updated=%d", total, updated)
} }
// Run index cleanup callback if set. // Run index cleanup callback if set.