diff --git a/doc/hooks.md b/doc/hooks.md index c1fe59453..08a450358 100644 --- a/doc/hooks.md +++ b/doc/hooks.md @@ -49,6 +49,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu - `gh-ost-on-before-cut-over` - `gh-ost-on-success` - `gh-ost-on-failure` +- `gh-ost-on-batch-copy-retry` ### Context @@ -81,6 +82,7 @@ The following variable are available on particular hooks: - `GH_OST_COMMAND` is only available in `gh-ost-on-interactive-command` - `GH_OST_STATUS` is only available in `gh-ost-on-status` +- `GH_OST_LAST_BATCH_COPY_ERROR` is only available in `gh-ost-on-batch-copy-retry` ### Examples diff --git a/go/base/context.go b/go/base/context.go index 35030e30b..4274aedb1 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -600,6 +600,13 @@ func (this *MigrationContext) GetIteration() int64 { return atomic.LoadInt64(&this.Iteration) } +func (this *MigrationContext) SetNextIterationRangeMinValues() { + this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues + if this.MigrationIterationRangeMinValues == nil { + this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues + } +} + func (this *MigrationContext) MarkPointOfInterest() int64 { this.pointOfInterestTimeMutex.Lock() defer this.pointOfInterestTimeMutex.Unlock() diff --git a/go/logic/applier.go b/go/logic/applier.go index 3fe7f2287..5a87c1560 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -825,10 +825,6 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo } this.LastIterationRangeMutex.Unlock() - this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues - if this.migrationContext.MigrationIterationRangeMinValues == nil { - this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues - } for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 232349d16..3eeaf92fb 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -541,7 +541,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc err = applier.ReadMigrationRangeValues() suite.Require().NoError(err) + migrationContext.SetNextIterationRangeMinValues() hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -619,6 +621,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai err = applier.AlterGhost() suite.Require().NoError(err) + migrationContext.SetNextIterationRangeMinValues() hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() suite.Require().NoError(err) suite.Require().True(hasFurtherRange) diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 2543f8e9a..5ba988a53 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -28,6 +28,7 @@ const ( onInteractiveCommand = "gh-ost-on-interactive-command" onSuccess = "gh-ost-on-success" onFailure = "gh-ost-on-failure" + onBatchCopyRetry = "gh-ost-on-batch-copy-retry" onStatus = "gh-ost-on-status" onStopReplication = "gh-ost-on-stop-replication" onStartReplication = "gh-ost-on-start-replication" @@ -77,6 +78,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ // executeHook executes a command, and sets relevant environment variables // combined output & error are printed to the configured writer. func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error { + this.migrationContext.Log.Infof("executing hook: %+v", hook) cmd := exec.Command(hook) cmd.Env = this.applyEnvironmentVariables(extraVariables...) @@ -123,6 +125,11 @@ func (this *HooksExecutor) onBeforeRowCopy() error { return this.executeHooks(onBeforeRowCopy) } +func (this *HooksExecutor) onBatchCopyRetry(errorMessage string) error { + v := fmt.Sprintf("GH_OST_LAST_BATCH_COPY_ERROR=%s", errorMessage) + return this.executeHooks(onBatchCopyRetry, v) +} + func (this *HooksExecutor) onRowCopyComplete() error { return this.executeHooks(onRowCopyComplete) } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 33271d01a..0e16cc6c1 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -129,6 +129,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { } } +func (this *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHint ...bool) (err error) { + wrappedOperation := func() error { + if err := operation(); err != nil { + this.hooksExecutor.onBatchCopyRetry(err.Error()) + return err + } + return nil + } + + return this.retryOperation(wrappedOperation, notFatalHint...) +} + // retryOperation attempts up to `count` attempts at running given function, // exiting as soon as it returns with non-error. func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) { @@ -1280,27 +1292,24 @@ func (this *Migrator) iterateChunks() error { return nil } copyRowsFunc := func() error { - if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done. - // There's another such check down the line - return nil - } - - // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - - hasFurtherRange := false - if err := this.retryOperation(func() (e error) { - hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues() - return e - }); err != nil { - return terminateRowIteration(err) - } - if !hasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return terminateRowIteration(nil) - } + this.migrationContext.SetNextIterationRangeMinValues() // Copy task: applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + // Done. + // There's another such check down the line + return nil + } + + // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever + hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues() + if err != nil { + return err // wrapping call will retry + } + if !hasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return terminateRowIteration(nil) + } if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { // No need for more writes. // This is the de-facto place where we avoid writing in the event of completed cut-over. @@ -1331,7 +1340,7 @@ func (this *Migrator) iterateChunks() error { atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil } - if err := this.retryOperation(applyCopyRowsFunc); err != nil { + if err := this.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { return terminateRowIteration(err) } return nil diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index b268054ab..6772a3332 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -6,10 +6,12 @@ package logic import ( + "bytes" "context" gosql "database/sql" "errors" "fmt" + "io" "os" "path/filepath" "strings" @@ -324,6 +326,8 @@ func (suite *MigratorTestSuite) SetupTest() { _, err := suite.db.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+testMysqlDatabase) suite.Require().NoError(err) + + os.Remove("/tmp/gh-ost.sock") } func (suite *MigratorTestSuite) TearDownTest() { @@ -379,6 +383,126 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() { suite.Require().Equal("_testing_del", tableName) } +func (suite *MigratorTestSuite) TestRetryBatchCopyWithHooks() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, "CREATE TABLE test.test_retry_batch (id INT PRIMARY KEY AUTO_INCREMENT, name TEXT)") + suite.Require().NoError(err) + + const initStride = 1000 + const totalBatches = 3 + for i := 0; i < totalBatches; i++ { + dataSize := 50 * i + for j := 0; j < initStride; j++ { + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO test.test_retry_batch (name) VALUES ('%s')", strings.Repeat("a", dataSize))) + suite.Require().NoError(err) + } + } + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*8)) + suite.Require().NoError(err) + defer func() { + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*1024*1024)) + suite.Require().NoError(err) + }() + + tmpDir, err := os.MkdirTemp("", "gh-ost-hooks") + suite.Require().NoError(err) + defer os.RemoveAll(tmpDir) + + hookScript := filepath.Join(tmpDir, "gh-ost-on-batch-copy-retry") + hookContent := `#!/bin/bash +# Mock hook that reduces chunk size on binlog cache error +ERROR_MSG="$GH_OST_LAST_BATCH_COPY_ERROR" +SOCKET_PATH="/tmp/gh-ost.sock" + +if ! [[ "$ERROR_MSG" =~ "max_binlog_cache_size" ]]; then + echo "Nothing to do for error: $ERROR_MSG" + exit 0 +fi + +CHUNK_SIZE=$(echo "chunk-size=?" | nc -U $SOCKET_PATH | tr -d '\n') + +MIN_CHUNK_SIZE=10 +NEW_CHUNK_SIZE=$(( CHUNK_SIZE * 8 / 10 )) +if [ $NEW_CHUNK_SIZE -lt $MIN_CHUNK_SIZE ]; then + NEW_CHUNK_SIZE=$MIN_CHUNK_SIZE +fi + +if [ $CHUNK_SIZE -eq $NEW_CHUNK_SIZE ]; then + echo "Chunk size unchanged: $CHUNK_SIZE" + exit 0 +fi + +echo "[gh-ost-on-batch-copy-retry]: Changing chunk size from $CHUNK_SIZE to $NEW_CHUNK_SIZE" +echo "chunk-size=$NEW_CHUNK_SIZE" | nc -U $SOCKET_PATH +echo "[gh-ost-on-batch-copy-retry]: Done, exiting..." +` + err = os.WriteFile(hookScript, []byte(hookContent), 0755) + suite.Require().NoError(err) + + origStdout := os.Stdout + origStderr := os.Stderr + + rOut, wOut, _ := os.Pipe() + rErr, wErr, _ := os.Pipe() + os.Stdout = wOut + os.Stderr = wErr + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := base.NewMigrationContext() + migrationContext.AllowedRunningOnMaster = true + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.DatabaseName = "test" + migrationContext.SkipPortValidation = true + migrationContext.OriginalTableName = "test_retry_batch" + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatementOptions = "MODIFY name LONGTEXT, ENGINE=InnoDB" + migrationContext.ReplicaServerId = 99999 + migrationContext.HeartbeatIntervalMilliseconds = 100 + migrationContext.ThrottleHTTPIntervalMillis = 100 + migrationContext.ThrottleHTTPTimeoutMillis = 1000 + migrationContext.HooksPath = tmpDir + migrationContext.ChunkSize = 1000 + migrationContext.SetDefaultNumRetries(10) + migrationContext.ServeSocketFile = "/tmp/gh-ost.sock" + + migrator := NewMigrator(migrationContext, "0.0.0") + + err = migrator.Migrate() + suite.Require().NoError(err) + + wOut.Close() + wErr.Close() + os.Stdout = origStdout + os.Stderr = origStderr + + var bufOut, bufErr bytes.Buffer + io.Copy(&bufOut, rOut) + io.Copy(&bufErr, rErr) + + outStr := bufOut.String() + errStr := bufErr.String() + + suite.Assert().Contains(outStr, "chunk-size: 1000") + suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 1000 to 800") + suite.Assert().Contains(outStr, "chunk-size: 800") + + suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 800 to 640") + suite.Assert().Contains(outStr, "chunk-size: 640") + + suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 640 to 512") + suite.Assert().Contains(outStr, "chunk-size: 512") + + var count int + err = suite.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test.test_retry_batch").Scan(&count) + suite.Require().NoError(err) + suite.Assert().Equal(3000, count) +} + func (suite *MigratorTestSuite) TestCopierIntPK() { ctx := context.Background() diff --git a/localtests/copy-retries-exhausted/after.sql b/localtests/copy-retries-exhausted/after.sql new file mode 100644 index 000000000..c3d55e5b9 --- /dev/null +++ b/localtests/copy-retries-exhausted/after.sql @@ -0,0 +1 @@ +set global max_binlog_cache_size = 1073741824; -- 1GB diff --git a/localtests/copy-retries-exhausted/before.sql b/localtests/copy-retries-exhausted/before.sql new file mode 100644 index 000000000..a3570171a --- /dev/null +++ b/localtests/copy-retries-exhausted/before.sql @@ -0,0 +1 @@ +set global max_binlog_cache_size = 1024; diff --git a/localtests/copy-retries-exhausted/create.sql b/localtests/copy-retries-exhausted/create.sql new file mode 100644 index 000000000..4e37938ec --- /dev/null +++ b/localtests/copy-retries-exhausted/create.sql @@ -0,0 +1,12 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + name mediumtext not null, + primary key (id) +) auto_increment=1; + +insert into gh_ost_test (name) +select repeat('a', 1500) +from information_schema.columns +cross join information_schema.tables +limit 1000; diff --git a/localtests/copy-retries-exhausted/expect_failure b/localtests/copy-retries-exhausted/expect_failure new file mode 100644 index 000000000..cd6a516ec --- /dev/null +++ b/localtests/copy-retries-exhausted/expect_failure @@ -0,0 +1 @@ +Multi-statement transaction required more than 'max_binlog_cache_size' bytes of storage diff --git a/localtests/copy-retries-exhausted/extra_args b/localtests/copy-retries-exhausted/extra_args new file mode 100644 index 000000000..e4f8a0104 --- /dev/null +++ b/localtests/copy-retries-exhausted/extra_args @@ -0,0 +1 @@ +--alter "modify column name mediumtext" --default-retries=1 --chunk-size=1000 diff --git a/localtests/test.sh b/localtests/test.sh index 6776d227a..7d0e992c7 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -225,6 +225,11 @@ test_single() { cat $tests_path/$test_name/create.sql return 1 fi + + if [ -f $tests_path/$test_name/before.sql ]; then + gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql + gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql + fi extra_args="" if [ -f $tests_path/$test_name/extra_args ]; then @@ -314,6 +319,11 @@ test_single() { gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "set @@global.sql_mode='${original_sql_mode}'" fi + if [ -f $tests_path/$test_name/after.sql ]; then + gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/after.sql + gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/after.sql + fi + if [ -f $tests_path/$test_name/destroy.sql ]; then gh-ost-test-mysql-master --default-character-set=utf8mb4 test <$tests_path/$test_name/destroy.sql fi