diff --git a/internal/db/bundb/migrations/20241121121623_enum_strings_to_ints.go b/internal/db/bundb/migrations/20241121121623_enum_strings_to_ints.go index ef292fcb9..113958437 100644 --- a/internal/db/bundb/migrations/20241121121623_enum_strings_to_ints.go +++ b/internal/db/bundb/migrations/20241121121623_enum_strings_to_ints.go @@ -30,107 +30,118 @@ func init() { up := func(ctx context.Context, db *bun.DB) error { - return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + // Status visibility type indices. + var statusVisIndices = []struct { + name string + cols []string + order string + }{ + { + name: "statuses_visibility_idx", + cols: []string{"visibility"}, + order: "", + }, + { + name: "statuses_profile_web_view_idx", + cols: []string{"account_id", "visibility"}, + order: "id DESC", + }, + { + name: "statuses_public_timeline_idx", + cols: []string{"visibility"}, + order: "id DESC", + }, + } - // Status visibility type indices. - var statusVisIndices = []struct { - name string - cols []string - order string - }{ - { - name: "statuses_visibility_idx", - cols: []string{"visibility"}, - order: "", - }, - { - name: "statuses_profile_web_view_idx", - cols: []string{"account_id", "visibility"}, - order: "id DESC", - }, - { - name: "statuses_public_timeline_idx", - cols: []string{"visibility"}, - order: "id DESC", - }, - } - - // Tables with visibility types. - var visTables = []struct { - Table string - Column string - Default *new_gtsmodel.Visibility - IndexCleanupCallback func(ctx context.Context, tx bun.Tx) error - }{ - { - Table: "statuses", - Column: "visibility", - IndexCleanupCallback: func(ctx context.Context, tx bun.Tx) error { - // After new column has been created and - // populated, drop indices relying on old column. - for _, index := range statusVisIndices { - log.Infof(ctx, "dropping old index %s...", index.name) - if _, err := tx.NewDropIndex(). - Index(index.name). - Exec(ctx); err != nil { - return err - } + // Tables with visibility types. + var visTables = []struct { + Table string + Column string + Default *new_gtsmodel.Visibility + IndexCleanupCallback func(ctx context.Context, tx bun.Tx) error + BatchByColumn string + }{ + { + Table: "statuses", + Column: "visibility", + IndexCleanupCallback: func(ctx context.Context, tx bun.Tx) error { + // After new column has been created and + // populated, drop indices relying on old column. + for _, index := range statusVisIndices { + log.Infof(ctx, "dropping old index %s...", index.name) + if _, err := tx.NewDropIndex(). + Index(index.name). + Exec(ctx); err != nil { + return err } - return nil - }, + } + return nil }, - { - Table: "sin_bin_statuses", - Column: "visibility", - }, - { - Table: "account_settings", - Column: "privacy", - Default: util.Ptr(new_gtsmodel.VisibilityDefault)}, - { - Table: "account_settings", - Column: "web_visibility", - Default: util.Ptr(new_gtsmodel.VisibilityDefault)}, - } + BatchByColumn: "id", + }, + { + Table: "sin_bin_statuses", + Column: "visibility", + BatchByColumn: "id", + }, + { + Table: "account_settings", + Column: "privacy", + Default: util.Ptr(new_gtsmodel.VisibilityDefault), + BatchByColumn: "account_id", + }, - // Get the mapping of old enum string values to new integer values. - visibilityMapping := visibilityEnumMapping[old_gtsmodel.Visibility]() + { + Table: "account_settings", + Column: "web_visibility", + Default: util.Ptr(new_gtsmodel.VisibilityDefault), + BatchByColumn: "account_id", + }, + } - // Convert all visibility tables. - for _, table := range visTables { - if err := convertEnums(ctx, tx, table.Table, table.Column, - visibilityMapping, table.Default, table.IndexCleanupCallback); err != nil { - return err - } - } + // Get the mapping of old enum string values to new integer values. + visibilityMapping := visibilityEnumMapping[old_gtsmodel.Visibility]() - // Recreate the visibility indices. - log.Info(ctx, "creating new visibility indexes...") - for _, index := range statusVisIndices { - log.Infof(ctx, "creating new index %s...", index.name) - q := tx.NewCreateIndex(). - Table("statuses"). - Index(index.name). - Column(index.cols...) - if index.order != "" { - q = q.ColumnExpr(index.order) - } - if _, err := q.Exec(ctx); err != nil { - return err - } - } + // Convert all visibility tables. + for _, table := range visTables { - // Get the mapping of old enum string values to the new integer value types. - notificationMapping := notificationEnumMapping[old_gtsmodel.NotificationType]() - - // Migrate over old notifications table column over to new column type. - if err := convertEnums(ctx, tx, "notifications", "notification_type", //nolint:revive - notificationMapping, nil, nil); err != nil { + // Perform each enum table conversion within its own transaction. + if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + return convertEnums(ctx, tx, table.Table, table.Column, + visibilityMapping, table.Default, table.IndexCleanupCallback, table.BatchByColumn) + }); err != nil { return err } + } - return nil - }) + // Recreate the visibility indices. + log.Info(ctx, "creating new visibility indexes...") + for _, index := range statusVisIndices { + log.Infof(ctx, "creating new index %s...", index.name) + q := db.NewCreateIndex(). + Table("statuses"). + Index(index.name). + Column(index.cols...) + if index.order != "" { + q = q.ColumnExpr(index.order) + } + if _, err := q.Exec(ctx); err != nil { + return err + } + } + + // Get the mapping of old enum string values to the new integer value types. + notificationMapping := notificationEnumMapping[old_gtsmodel.NotificationType]() + + // Migrate over old notifications table column to new type in tx. + if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + return convertEnums(ctx, tx, "notifications", "notification_type", //nolint:revive + notificationMapping, nil, nil, "id") + }); err != nil { + return err + } + + return nil } down := func(ctx context.Context, db *bun.DB) error { diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 7f8b57c42..6ffcdd09d 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -22,11 +22,13 @@ "errors" "fmt" "reflect" + "slices" "strconv" "strings" "codeberg.org/gruf/go-byteutil" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/uptrace/bun" "github.com/uptrace/bun/dialect" @@ -46,6 +48,7 @@ func convertEnums[OldType ~string, NewType ~int16]( mapping map[OldType]NewType, defaultValue *NewType, indexCleanupCallback func(context.Context, bun.Tx) error, + batchByColumn string, ) error { if len(mapping) == 0 { return errors.New("empty mapping") @@ -87,7 +90,7 @@ func convertEnums[OldType ~string, NewType ~int16]( var qbuf byteutil.Buffer // Prepare a singular UPDATE statement using - // SET $newColumn = (CASE $column WHEN $old THEN $new ... END) + // SET $newColumn = (CASE $column WHEN $old THEN $new ... END). qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...) args = append(args, bun.Ident(table)) args = append(args, bun.Ident(newColumn)) @@ -99,16 +102,82 @@ func convertEnums[OldType ~string, NewType ~int16]( qbuf.B = append(qbuf.B, "ELSE ? END)"...) args = append(args, *defaultValue) - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qbuf.String(), args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) + // Serialize it here to be + // used as the base for each + // set of batch queries below. + baseQStr := string(qbuf.B) + baseArgs := args + + // Query batch size + // in number of rows. + const batchsz = 5000 + + // Stores highest batch value + // used in iterate queries, + // starting at highest possible. + highest := id.Highest + + // Total updated rows. + var updated int + + for { + // Limit to batchsz + // items at once. + batchQ := tx. + NewSelect(). + Table(table). + Column(batchByColumn). + Where("? < ?", bun.Ident(batchByColumn), highest). + OrderExpr("? DESC", bun.Ident(batchByColumn)). + Limit(batchsz) + + // Finalize UPDATE to operate on this batch only. + qStr := baseQStr + " WHERE ? IN (?)" + args := append( + slices.Clone(baseArgs), + bun.Ident(batchByColumn), + batchQ, + ) + + // Execute the prepared raw query with arguments. + res, err := tx.NewRaw(qStr, args...).Exec(ctx) + if err != nil { + return gtserror.Newf("error updating old column values: %w", err) + } + + // Check how many items we updated. + thisUpdated, err := res.RowsAffected() + if err != nil { + return gtserror.Newf("error counting affected rows: %w", err) + } + + if thisUpdated == 0 { + // Nothing updated + // means we're done. + break + } + + // Update the overall count. + updated += int(thisUpdated) + + // Log helpful message to admin. + log.Infof(ctx, "migrated %d of %d %s (up to %s)", + updated, total, table, highest) + + // Get next highest + // id for next batch. + if err := tx. + NewSelect(). + With("batch_query", batchQ). + ColumnExpr("min(?) FROM ?", bun.Ident(batchByColumn), bun.Ident("batch_query")). + Scan(ctx, &highest); err != nil { + return gtserror.Newf("error selecting next highest: %w", err) + } } - // Count number items updated. - updated, _ := res.RowsAffected() if total != int(updated) { - log.Warnf(ctx, "total=%d does not match updated=%d", total, updated) + // Return error here in order to rollback the whole transaction. + return fmt.Errorf("total=%d does not match updated=%d", total, updated) } // Run index cleanup callback if set.