diff --git a/doc/hooks.md b/doc/hooks.md index c1fe59453..b033d3212 100644 --- a/doc/hooks.md +++ b/doc/hooks.md @@ -76,6 +76,7 @@ The following variables are available on all hooks: - `GH_OST_HOOKS_HINT_OWNER` - copy of `--hooks-hint-owner` value - `GH_OST_HOOKS_HINT_TOKEN` - copy of `--hooks-hint-token` value - `GH_OST_DRY_RUN` - whether or not the `gh-ost` run is a dry run +- `GH_OST_REVERT` - whether or not `gh-ost` is running in revert mode The following variable are available on particular hooks: diff --git a/doc/resume.md b/doc/resume.md index 46d6ac909..6f12ab7a3 100644 --- a/doc/resume.md +++ b/doc/resume.md @@ -4,6 +4,7 @@ - The first `gh-ost` process was invoked with `--checkpoint` - The first `gh-ost` process had at least one successful checkpoint - The binlogs from the last checkpoint's binlog coordinates still exist on the replica gh-ost is inspecting (specified by `--host`) +- The checkpoint table (name ends with `_ghk`) still exists To resume, invoke `gh-ost` again with the same arguments with the `--resume` flag. diff --git a/doc/revert.md b/doc/revert.md new file mode 100644 index 000000000..6737da1b2 --- /dev/null +++ b/doc/revert.md @@ -0,0 +1,56 @@ +# Reverting Migrations + +`gh-ost` can attempt to revert a previously completed migration if the follow conditions are met: +- The first `gh-ost` process was invoked with `--checkpoint` +- The checkpoint table (name ends with `_ghk`) still exists +- The binlogs from the time of the migration's cut-over still exist on the replica gh-ost is inspecting (specified by `--host`) + +To revert, find the name of the "old" table from the original migration e.g. `_mytable_del`. Then invoke `gh-ost` with the same arguments and the flags `--revert` and `--old-table="_mytable_del"`. +gh-ost will read the binlog coordinates of the original cut-over from the checkpoint table and bring the old table up to date. Then it performs another cut-over to complete the reversion. +Note that the checkpoint table (name ends with _ghk) will not be automatically dropped unless `--ok-to-drop-table` is provided. + +> [!WARNING] +> It is recommended use `--checkpoint` with `--gtid` enabled so that checkpoint binlog coordinates store GTID sets rather than file positions. In that case, `gh-ost` can revert using a different replica than it originally attached to. + +### ❗ Note ❗ +Reverting is roughly equivalent to applying the "reverse" migration. _Before attempting to revert you should determine if the reverse migration is possible and does not involve any unacceptable data loss._ + +For example: if the original migration drops a `NOT NULL` column that has no `DEFAULT` then the reverse migration adds the column. In this case, the reverse migration is impossible if rows were added after the original cut-over and the revert will fail. +Another example: if the original migration modifies a `VARCHAR(32)` column to `VARCHAR(64)`, the reverse migration truncates the `VARCHAR(64)` column to `VARCHAR(32)`. If values were inserted with length > 32 after the cut-over then the revert will fail. + + +## Example +The migration starts with a `gh-ost` invocation such as: +```shell +gh-ost \ +--chunk-size=100 \ +--host=replica1.company.com \ +--database="mydb" \ +--table="mytable" \ +--alter="drop key idx1" +--gtid \ +--checkpoint \ +--checkpoint-seconds=60 \ +--execute +``` + +In this example `gh-ost` writes a cut-over checkpoint to `_mytable_ghk` after the cut-over is successful. The original table is renamed to `_mytable_del`. + +Suppose that dropping the index causes problems, the migration can be revert with: +```shell +# revert migration +gh-ost \ +--chunk-size=100 \ +--host=replica1.company.com \ +--database="mydb" \ +--table="mytable" \ +--old-table="_mytable_del" +--gtid \ +--checkpoint \ +--checkpoint-seconds=60 \ +--revert \ +--execute +``` + +gh-ost then reconnects at the binlog coordinates stored in the cut-over checkpoint and applies DMLs until the old table is up-to-date. +Note that the "reverse" migration is `ADD KEY idx(...)` so there is no potential data loss to consider in this case. diff --git a/go/base/context.go b/go/base/context.go index 35030e30b..92f052a12 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -104,6 +104,8 @@ type MigrationContext struct { AzureMySQL bool AttemptInstantDDL bool Resume bool + Revert bool + OldTableName string // SkipPortValidation allows skipping the port validation in `ValidateConnection` // This is useful when connecting to a MySQL instance where the external port @@ -348,6 +350,10 @@ func getSafeTableName(baseName string, suffix string) string { // GetGhostTableName generates the name of ghost table, based on original table name // or a given table name func (this *MigrationContext) GetGhostTableName() string { + if this.Revert { + // When reverting the "ghost" table is the _del table from the original migration. + return this.OldTableName + } if this.ForceTmpTableName != "" { return getSafeTableName(this.ForceTmpTableName, "gho") } else { @@ -364,14 +370,18 @@ func (this *MigrationContext) GetOldTableName() string { tableName = this.OriginalTableName } + suffix := "del" + if this.Revert { + suffix = "rev_del" + } if this.TimestampOldTable { t := this.StartTime timestamp := fmt.Sprintf("%d%02d%02d%02d%02d%02d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) - return getSafeTableName(tableName, fmt.Sprintf("%s_del", timestamp)) + return getSafeTableName(tableName, fmt.Sprintf("%s_%s", timestamp, suffix)) } - return getSafeTableName(tableName, "del") + return getSafeTableName(tableName, suffix) } // GetChangelogTableName generates the name of changelog table, based on original table name diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 0a2a8afa4..2c2bc6665 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -148,6 +148,8 @@ func main() { flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints") flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints") flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint") + flag.BoolVar(&migrationContext.Revert, "revert", false, "Attempt to revert completed migration") + flag.StringVar(&migrationContext.OldTableName, "old-table", "", "The name of the old table when using --revert, e.g. '_mytable_del'") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits") @@ -206,12 +208,35 @@ func main() { migrationContext.SetConnectionCharset(*charset) - if migrationContext.AlterStatement == "" { + if migrationContext.AlterStatement == "" && !migrationContext.Revert { log.Fatal("--alter must be provided and statement must not be empty") } parser := sql.NewParserFromAlterStatement(migrationContext.AlterStatement) migrationContext.AlterStatementOptions = parser.GetAlterStatementOptions() + if migrationContext.Revert { + if migrationContext.Resume { + log.Fatal("--revert cannot be used with --resume") + } + if migrationContext.OldTableName == "" { + migrationContext.Log.Fatalf("--revert must be called with --old-table") + } + + // options irrelevant to revert mode + if migrationContext.AlterStatement != "" { + log.Warning("--alter was provided with --revert, it will be ignored") + } + if migrationContext.AttemptInstantDDL { + log.Warning("--attempt-instant-ddl was provided with --revert, it will be ignored") + } + if migrationContext.IncludeTriggers { + log.Warning("--include-triggers was provided with --revert, it will be ignored") + } + if migrationContext.DiscardForeignKeys { + log.Warning("--discard-foreign-keys was provided with --revert, it will be ignored") + } + } + if migrationContext.DatabaseName == "" { if parser.HasExplicitSchema() { migrationContext.DatabaseName = parser.GetExplicitSchema() @@ -347,7 +372,14 @@ func main() { acceptSignals(migrationContext) migrator := logic.NewMigrator(migrationContext, AppVersion) - if err := migrator.Migrate(); err != nil { + var err error + if migrationContext.Revert { + err = migrator.Revert() + } else { + err = migrator.Migrate() + } + + if err != nil { migrator.ExecOnFailureHook() migrationContext.Log.Fatale(err) } diff --git a/go/logic/applier.go b/go/logic/applier.go index 347817d6f..907d42995 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -437,6 +437,7 @@ func (this *Applier) CreateCheckpointTable() error { "`gh_ost_chk_iteration` bigint", "`gh_ost_rows_copied` bigint", "`gh_ost_dml_applied` bigint", + "`gh_ost_is_cutover` tinyint(1) DEFAULT '0'", } for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { if col.MySQLType == "" { @@ -444,18 +445,12 @@ func (this *Applier) CreateCheckpointTable() error { } minColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_min" colDef := fmt.Sprintf("%s %s", sql.EscapeName(minColName), col.MySQLType) - if !col.Nullable { - colDef += " NOT NULL" - } colDefs = append(colDefs, colDef) } for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { maxColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_max" colDef := fmt.Sprintf("%s %s", sql.EscapeName(maxColName), col.MySQLType) - if !col.Nullable { - colDef += " NOT NULL" - } colDefs = append(colDefs, colDef) } @@ -627,7 +622,7 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { if err != nil { return insertId, err } - args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied) + args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover) args = append(args, uniqueKeyArgs...) res, err := this.db.Exec(query, args...) if err != nil { @@ -645,7 +640,7 @@ func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) { var coordStr string var timestamp int64 - ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied} + ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover} ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) err := row.Scan(ptrs...) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 232349d16..23c34ed39 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -214,6 +214,7 @@ func (suite *ApplierTestSuite) SetupSuite() { testmysql.WithUsername(testMysqlUser), testmysql.WithPassword(testMysqlPass), testcontainers.WithWaitStrategy(wait.ForExposedPort()), + testmysql.WithConfigFile("my.cnf.test"), ) suite.Require().NoError(err) @@ -272,7 +273,7 @@ func (suite *ApplierTestSuite) TestInitDBConnections() { mysqlVersion, _ := strings.CutPrefix(testMysqlContainerImage, "mysql:") suite.Require().Equal(mysqlVersion, migrationContext.ApplierMySQLVersion) suite.Require().Equal(int64(28800), migrationContext.ApplierWaitTimeout) - suite.Require().Equal("SYSTEM", migrationContext.ApplierTimeZone) + suite.Require().Equal("+00:00", migrationContext.ApplierTimeZone) suite.Require().Equal(sql.NewColumnList([]string{"id", "item_id"}), migrationContext.OriginalTableColumnsOnApplier) } @@ -702,6 +703,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { Iteration: 2, RowsCopied: 100000, DMLApplied: 200000, + IsCutover: true, } id, err := applier.WriteCheckpoint(chk) suite.Require().NoError(err) @@ -716,6 +718,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String()) suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) + suite.Require().Equal(chk.IsCutover, gotChk.IsCutover) } func TestApplier(t *testing.T) { diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go index 69cc2bd20..cffe08c4b 100644 --- a/go/logic/checkpoint.go +++ b/go/logic/checkpoint.go @@ -28,4 +28,5 @@ type Checkpoint struct { Iteration int64 RowsCopied int64 DMLApplied int64 + IsCutover bool } diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 2543f8e9a..00a8d0fab 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -69,6 +69,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken)) env = append(env, fmt.Sprintf("GH_OST_DRY_RUN=%t", this.migrationContext.Noop)) + env = append(env, fmt.Sprintf("GH_OST_REVERT=%t", this.migrationContext.Revert)) env = append(env, extraVariables...) return env diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 33271d01a..507a503fe 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -44,6 +44,11 @@ func ReadChangelogState(s string) ChangelogState { type tableWriteFunc func() error +type lockProcessedStruct struct { + state string + coords mysql.BinlogCoordinates +} + type applyEventStruct struct { writeFunc *tableWriteFunc dmlEvent *binlog.BinlogDMLEvent @@ -85,7 +90,8 @@ type Migrator struct { firstThrottlingCollected chan bool ghostTableMigrated chan bool rowCopyComplete chan error - allEventsUpToLockProcessed chan string + allEventsUpToLockProcessed chan *lockProcessedStruct + lastLockProcessed *lockProcessedStruct rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -105,7 +111,7 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), - allEventsUpToLockProcessed: make(chan string), + allEventsUpToLockProcessed: make(chan *lockProcessedStruct), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), @@ -223,7 +229,10 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e this.ghostTableMigrated <- true case AllEventsUpToLockProcessed: var applyEventFunc tableWriteFunc = func() error { - this.allEventsUpToLockProcessed <- changelogStateString + this.allEventsUpToLockProcessed <- &lockProcessedStruct{ + state: changelogStateString, + coords: dmlEntry.Coordinates.Clone(), + } return nil } // at this point we know all events up to lock have been read from the streamer, @@ -355,7 +364,7 @@ func (this *Migrator) Migrate() (err error) { return err } // If we are resuming, we will initiateStreaming later when we know - // the coordinates to resume streaming. + // the binlog coordinates to resume streaming from. // If not resuming, the streamer must be initiated before the applier, // so that the "GhostTableMigrated" event gets processed. if !this.migrationContext.Resume { @@ -495,6 +504,15 @@ func (this *Migrator) Migrate() (err error) { } atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1) + if this.migrationContext.Checkpoint && !this.migrationContext.Noop { + cutoverChk, err := this.CheckpointAfterCutOver() + if err != nil { + this.migrationContext.Log.Warningf("failed to checkpoint after cutover: %+v", err) + } else { + this.migrationContext.Log.Infof("checkpoint success after cutover at coords=%+v", cutoverChk.LastTrxCoords.DisplayString()) + } + } + if err := this.finalCleanup(); err != nil { return nil } @@ -505,6 +523,98 @@ func (this *Migrator) Migrate() (err error) { return nil } +// Revert reverts a migration that previously completed by applying all DML events that happened +// after the original cutover, then doing another cutover to swap the tables back. +// The steps are similar to Migrate(), but without row copying. +func (this *Migrator) Revert() error { + this.migrationContext.Log.Infof("Reverting %s.%s from %s.%s", + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OldTableName)) + this.migrationContext.StartTime = time.Now() + var err error + if this.migrationContext.Hostname, err = os.Hostname(); err != nil { + return err + } + + go this.listenOnPanicAbort() + + if err := this.hooksExecutor.onStartup(); err != nil { + return err + } + if err := this.validateAlterStatement(); err != nil { + return err + } + defer this.teardown() + + if err := this.initiateInspector(); err != nil { + return err + } + if err := this.initiateApplier(); err != nil { + return err + } + if err := this.createFlagFiles(); err != nil { + return err + } + if err := this.inspector.inspectOriginalAndGhostTables(); err != nil { + return err + } + if err := this.applier.prepareQueries(); err != nil { + return err + } + + lastCheckpoint, err := this.applier.ReadLastCheckpoint() + if err != nil { + return this.migrationContext.Log.Errorf("No checkpoint found, unable to revert: %+v", err) + } + if !lastCheckpoint.IsCutover { + return this.migrationContext.Log.Errorf("Last checkpoint is not after cutover, unable to revert: coords=%+v time=%+v", lastCheckpoint.LastTrxCoords, lastCheckpoint.Timestamp) + } + this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords + this.migrationContext.TotalRowsCopied = lastCheckpoint.RowsCopied + this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin + this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax + if err := this.initiateStreaming(); err != nil { + return err + } + if err := this.hooksExecutor.onValidated(); err != nil { + return err + } + if err := this.initiateServer(); err != nil { + return err + } + defer this.server.RemoveSocketFile() + if err := this.addDMLEventsListener(); err != nil { + return err + } + + this.initiateThrottler() + go this.initiateStatus() + go this.executeDMLWriteFuncs() + + this.printStatus(ForcePrintStatusRule) + var retrier func(func() error, ...bool) error + if this.migrationContext.CutOverExponentialBackoff { + retrier = this.retryOperationWithExponentialBackoff + } else { + retrier = this.retryOperation + } + if err := this.hooksExecutor.onBeforeCutOver(); err != nil { + return err + } + if err := retrier(this.cutOver); err != nil { + return err + } + atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1) + if err := this.finalCleanup(); err != nil { + return nil + } + if err := this.hooksExecutor.onSuccess(); err != nil { + return err + } + this.migrationContext.Log.Infof("Done reverting %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + // ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external // hook access point func (this *Migrator) ExecOnFailureHook() (err error) { @@ -622,7 +732,7 @@ func (this *Migrator) cutOver() (err error) { // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. -func (this *Migrator) waitForEventsUpToLock() (err error) { +func (this *Migrator) waitForEventsUpToLock() error { timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) this.migrationContext.MarkPointOfInterest() @@ -635,19 +745,21 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { } this.migrationContext.Log.Infof("Waiting for events up to lock") atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) + var lockProcessed *lockProcessedStruct for found := false; !found; { select { case <-timeout.C: { return this.migrationContext.Log.Errorf("Timeout while waiting for events up to lock") } - case state := <-this.allEventsUpToLockProcessed: + case lockProcessed = <-this.allEventsUpToLockProcessed: { - if state == allEventsUpToLockProcessedChallenge { - this.migrationContext.Log.Infof("Waiting for events up to lock: got %s", state) + if lockProcessed.state == allEventsUpToLockProcessedChallenge { + this.migrationContext.Log.Infof("Waiting for events up to lock: got %s", lockProcessed.state) found = true + this.lastLockProcessed = lockProcessed } else { - this.migrationContext.Log.Infof("Waiting for events up to lock: skipping %s", state) + this.migrationContext.Log.Infof("Waiting for events up to lock: skipping %s", lockProcessed.state) } } } @@ -1220,7 +1332,12 @@ func (this *Migrator) initiateApplier() error { if err := this.applier.InitDBConnections(); err != nil { return err } - if !this.migrationContext.Resume { + if this.migrationContext.Revert { + if err := this.applier.CreateChangelogTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + } else if !this.migrationContext.Resume { if err := this.applier.ValidateOrDropExistingTables(); err != nil { return err } @@ -1435,6 +1552,35 @@ func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { } } +// CheckpointAfterCutOver writes a final checkpoint after the cutover completes successfully. +func (this *Migrator) CheckpointAfterCutOver() (*Checkpoint, error) { + if this.lastLockProcessed == nil || this.lastLockProcessed.coords.IsEmpty() { + return nil, this.migrationContext.Log.Errorf("lastLockProcessed coords are empty") + } + + chk := &Checkpoint{ + IsCutover: true, + LastTrxCoords: this.lastLockProcessed.coords, + IterationRangeMin: sql.NewColumnValues(this.migrationContext.UniqueKey.Len()), + IterationRangeMax: sql.NewColumnValues(this.migrationContext.UniqueKey.Len()), + Iteration: this.migrationContext.GetIteration(), + RowsCopied: atomic.LoadInt64(&this.migrationContext.TotalRowsCopied), + DMLApplied: atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), + } + this.applier.LastIterationRangeMutex.Lock() + if this.applier.LastIterationRangeMinValues != nil { + chk.IterationRangeMin = this.applier.LastIterationRangeMinValues.Clone() + } + if this.applier.LastIterationRangeMaxValues != nil { + chk.IterationRangeMax = this.applier.LastIterationRangeMaxValues.Clone() + } + this.applier.LastIterationRangeMutex.Unlock() + + id, err := this.applier.WriteCheckpoint(chk) + chk.Id = id + return chk, err +} + func (this *Migrator) checkpointLoop() { if this.migrationContext.Noop { this.migrationContext.Log.Debugf("Noop operation; not really checkpointing") @@ -1443,9 +1589,12 @@ func (this *Migrator) checkpointLoop() { checkpointInterval := time.Duration(this.migrationContext.CheckpointIntervalSeconds) * time.Second ticker := time.NewTicker(checkpointInterval) for t := range ticker.C { - if atomic.LoadInt64(&this.finishedMigrating) > 0 { + if atomic.LoadInt64(&this.finishedMigrating) > 0 || atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) > 0 { return } + if atomic.LoadInt64(&this.migrationContext.InCutOverCriticalSectionFlag) > 0 { + continue + } this.migrationContext.Log.Infof("starting checkpoint at %+v", t) ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout) chk, err := this.Checkpoint(ctx) @@ -1517,6 +1666,31 @@ func (this *Migrator) executeWriteFuncs() error { } } +func (this *Migrator) executeDMLWriteFuncs() error { + if this.migrationContext.Noop { + this.migrationContext.Log.Debugf("Noop operation; not really executing DML write funcs") + return nil + } + for { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return nil + } + + this.throttler.throttle(nil) + + select { + case eventStruct := <-this.applyEventsQueue: + { + if err := this.onApplyEventStruct(eventStruct); err != nil { + return err + } + } + case <-time.After(time.Second): + continue + } + } +} + // finalCleanup takes actions at very end of migration, dropping tables etc. func (this *Migrator) finalCleanup() error { atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1) @@ -1541,17 +1715,19 @@ func (this *Migrator) finalCleanup() error { if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { return err } - if err := this.retryOperation(this.applier.DropCheckpointTable); err != nil { - return err - } if this.migrationContext.OkToDropTable && !this.migrationContext.TestOnReplica { if err := this.retryOperation(this.applier.DropOldTable); err != nil { return err } - } else { - if !this.migrationContext.Noop { - this.migrationContext.Log.Infof("Am not dropping old table because I want this operation to be as live as possible. If you insist I should do it, please add `--ok-to-drop-table` next time. But I prefer you do not. To drop the old table, issue:") - this.migrationContext.Log.Infof("-- drop table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName())) + if err := this.retryOperation(this.applier.DropCheckpointTable); err != nil { + return err + } + } else if !this.migrationContext.Noop { + this.migrationContext.Log.Infof("Am not dropping old table because I want this operation to be as live as possible. If you insist I should do it, please add `--ok-to-drop-table` next time. But I prefer you do not. To drop the old table, issue:") + this.migrationContext.Log.Infof("-- drop table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName())) + if this.migrationContext.Checkpoint { + this.migrationContext.Log.Infof("Am not dropping checkpoint table without `--ok-to-drop-table`. To drop the checkpoint table, issue:") + this.migrationContext.Log.Infof("-- drop table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetCheckpointTableName())) } } if this.migrationContext.Noop { diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index b268054ab..7e5ddab9f 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -301,6 +301,7 @@ func (suite *MigratorTestSuite) SetupSuite() { testmysql.WithUsername(testMysqlUser), testmysql.WithPassword(testMysqlPass), testcontainers.WithWaitStrategy(wait.ForExposedPort()), + testmysql.WithConfigFile("my.cnf.test"), ) suite.Require().NoError(err) @@ -333,6 +334,10 @@ func (suite *MigratorTestSuite) TearDownTest() { suite.Require().NoError(err) _, err = suite.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestGhostTableName()) suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestRevertedTableName()) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOldTableName()) + suite.Require().NoError(err) } func (suite *MigratorTestSuite) TestMigrateEmpty() { @@ -667,6 +672,138 @@ func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() { suite.Require().Equal("CREATE TABLE `testing` (\n `id` int NOT NULL,\n `name` varchar(64) DEFAULT NULL,\n `foobar` varchar(255) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createTableSQL) } +func (suite *MigratorTestSuite) TestRevertEmpty() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, s CHAR(32))", getTestTableName())) + suite.Require().NoError(err) + + var oldTableName string + + // perform original migration + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + { + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatement = "ADD COLUMN newcol CHAR(32)" + migrationContext.Checkpoint = true + migrationContext.CheckpointIntervalSeconds = 10 + migrationContext.DropServeSocket = true + migrationContext.InitiallyDropOldTable = true + migrationContext.UseGTIDs = true + + migrator := NewMigrator(migrationContext, "0.0.0") + + err = migrator.Migrate() + oldTableName = migrationContext.GetOldTableName() + suite.Require().NoError(err) + } + + // revert the original migration + { + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.DropServeSocket = true + migrationContext.UseGTIDs = true + migrationContext.Revert = true + migrationContext.OkToDropTable = true + migrationContext.OldTableName = oldTableName + + migrator := NewMigrator(migrationContext, "0.0.0") + + err = migrator.Revert() + suite.Require().NoError(err) + } +} + +func (suite *MigratorTestSuite) TestRevert() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, s CHAR(32))", getTestTableName())) + suite.Require().NoError(err) + + numRows := 0 + for range 100 { + _, err = suite.db.ExecContext(ctx, + fmt.Sprintf("INSERT INTO %s (id, s) VALUES (%d, MD5('%d'))", getTestTableName(), numRows, numRows)) + suite.Require().NoError(err) + numRows += 1 + } + + var oldTableName string + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + // perform original migration + { + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatement = "ADD INDEX idx1 (s)" + migrationContext.Checkpoint = true + migrationContext.CheckpointIntervalSeconds = 10 + migrationContext.DropServeSocket = true + migrationContext.InitiallyDropOldTable = true + migrationContext.UseGTIDs = true + + migrator := NewMigrator(migrationContext, "0.0.0") + + err = migrator.Migrate() + oldTableName = migrationContext.GetOldTableName() + suite.Require().NoError(err) + } + + // do some writes + for range 100 { + _, err = suite.db.ExecContext(ctx, + fmt.Sprintf("INSERT INTO %s (id, s) VALUES (%d, MD5('%d'))", getTestTableName(), numRows, numRows)) + suite.Require().NoError(err) + numRows += 1 + } + for i := 0; i < numRows; i += 7 { + _, err = suite.db.ExecContext(ctx, + fmt.Sprintf("UPDATE %s SET s=MD5('%d') where id=%d", getTestTableName(), 2*i, i)) + suite.Require().NoError(err) + } + + // revert the original migration + { + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.DropServeSocket = true + migrationContext.UseGTIDs = true + migrationContext.Revert = true + migrationContext.OldTableName = oldTableName + + migrator := NewMigrator(migrationContext, "0.0.0") + + err = migrator.Revert() + oldTableName = migrationContext.GetOldTableName() + suite.Require().NoError(err) + } + + // checksum original and reverted table + var _tableName, checksum1, checksum2 string + rows, err := suite.db.Query(fmt.Sprintf("CHECKSUM TABLE %s, %s", testMysqlTableName, oldTableName)) + suite.Require().NoError(err) + defer rows.Close() + suite.Require().True(rows.Next()) + suite.Require().NoError(rows.Scan(&_tableName, &checksum1)) + suite.Require().True(rows.Next()) + suite.Require().NoError(rows.Scan(&_tableName, &checksum2)) + suite.Require().NoError(rows.Err()) + + suite.Require().Equal(checksum1, checksum2) +} + func TestMigrator(t *testing.T) { suite.Run(t, new(MigratorTestSuite)) } diff --git a/go/logic/my.cnf.test b/go/logic/my.cnf.test new file mode 100644 index 000000000..2938c12cd --- /dev/null +++ b/go/logic/my.cnf.test @@ -0,0 +1,27 @@ +# mysql server configuration for testcontainer +[mysqld] +max_connections = 200 +innodb_log_file_size = 64M +innodb_flush_log_at_trx_commit = 2 +innodb_flush_method = O_DIRECT +skip-name-resolve +skip-ssl + +character-set-server = utf8mb4 +collation-server = utf8mb4_0900_ai_ci + +default-time-zone = '+00:00' + +sql_mode = STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION + +general_log = 0 +general_log_file = /var/log/mysql/general.log +slow_query_log = 0 +slow_query_log_file = /var/log/mysql/slow.log +long_query_time = 2 + +gtid_mode=ON +enforce_gtid_consistency=ON + +[client] +default-character-set = utf8mb4 diff --git a/go/logic/test_utils.go b/go/logic/test_utils.go index d532e0920..f552cfc76 100644 --- a/go/logic/test_utils.go +++ b/go/logic/test_utils.go @@ -28,6 +28,14 @@ func getTestGhostTableName() string { return fmt.Sprintf("`%s`.`_%s_gho`", testMysqlDatabase, testMysqlTableName) } +func getTestRevertedTableName() string { + return fmt.Sprintf("`%s`.`_%s_rev_del`", testMysqlDatabase, testMysqlTableName) +} + +func getTestOldTableName() string { + return fmt.Sprintf("`%s`.`_%s_del`", testMysqlDatabase, testMysqlTableName) +} + func getTestConnectionConfig(ctx context.Context, container testcontainers.Container) (*mysql.ConnectionConfig, error) { host, err := container.Host(ctx) if err != nil { diff --git a/go/sql/builder.go b/go/sql/builder.go index 757d74910..61dd9706f 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -142,11 +142,11 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns insert /* gh-ost */ into %s.%s (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, - gh_ost_rows_copied, gh_ost_dml_applied, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, %s, %s) values (unix_timestamp(now()), ?, ?, - ?, ?, + ?, ?, ?, %s, %s)`, databaseName, tableName, strings.Join(minUniqueColNames, ", "), diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 06e402c89..840d85c96 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -803,12 +803,12 @@ func TestCheckpointQueryBuilder(t *testing.T) { expected := ` insert /* gh-ost */ into mydb._tbl_ghk (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, - gh_ost_rows_copied, gh_ost_dml_applied, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, name_min, position_min, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_min, name_max, position_max, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_max) values (unix_timestamp(now()), ?, ?, - ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?) `