Skip to content

Commit

Permalink
refactoring TestIntegrationEmbedded, add getEmbeddedRestoreSettings t…
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Aug 29, 2024
1 parent 54bbffc commit dfa2811
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
32 changes: 25 additions & 7 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (b *Backuper) isDiskTypeEncryptedObject(disk clickhouse.Disk, disks []click
return underlyingIdx >= 0
}

func (b *Backuper) getEmbeddedBackupDefaultSettings(version int) []string {
// getEmbeddedRestoreSettings - different with getEmbeddedBackupSettings, cause https://github.com/ClickHouse/ClickHouse/issues/69053
func (b *Backuper) getEmbeddedRestoreSettings(version int) []string {
settings := []string{}
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23007000 {
settings = append(settings, "allow_s3_native_copy=1")
Expand All @@ -209,7 +210,24 @@ func (b *Backuper) getEmbeddedBackupDefaultSettings(version int) []string {
log.Fatal().Msgf("SET s3_use_adaptive_timeouts=0 error: %v", err)
}
}
if b.cfg.General.RemoteStorage == "azblob" && version >= 24005000 {
return settings
}

func (b *Backuper) getEmbeddedBackupSettings(version int) []string {
settings := []string{}
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23007000 {
settings = append(settings, "allow_s3_native_copy=1")
if err := b.ch.Query("SET s3_request_timeout_ms=600000"); err != nil {
log.Fatal().Msgf("SET s3_request_timeout_ms=600000 error: %v", err)
}

}
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23011000 {
if err := b.ch.Query("SET s3_use_adaptive_timeouts=0"); err != nil {
log.Fatal().Msgf("SET s3_use_adaptive_timeouts=0 error: %v", err)
}
}
if b.cfg.General.RemoteStorage == "azblob" && version >= 24005000 && b.cfg.ClickHouse.EmbeddedBackupDisk == "" {
settings = append(settings, "allow_azure_native_copy=1")
}
return settings
Expand All @@ -229,10 +247,10 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return "", err
}
if b.cfg.S3.AccessKey != "" {
return fmt.Sprintf("S3('%s/%s','%s','%s')", s3Endpoint, backupName, b.cfg.S3.AccessKey, b.cfg.S3.SecretKey), nil
return fmt.Sprintf("S3('%s/%s/','%s','%s')", s3Endpoint, backupName, b.cfg.S3.AccessKey, b.cfg.S3.SecretKey), nil
}
if os.Getenv("AWS_ACCESS_KEY_ID") != "" {
return fmt.Sprintf("S3('%s/%s','%s','%s')", s3Endpoint, backupName, os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")), nil
return fmt.Sprintf("S3('%s/%s/','%s','%s')", s3Endpoint, backupName, os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")), nil
}
return "", fmt.Errorf("provide s3->access_key and s3->secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
}
Expand All @@ -242,10 +260,10 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return "", err
}
if b.cfg.GCS.EmbeddedAccessKey != "" {
return fmt.Sprintf("S3('%s/%s','%s','%s')", gcsEndpoint, backupName, b.cfg.GCS.EmbeddedAccessKey, b.cfg.GCS.EmbeddedSecretKey), nil
return fmt.Sprintf("S3('%s/%s/','%s','%s')", gcsEndpoint, backupName, b.cfg.GCS.EmbeddedAccessKey, b.cfg.GCS.EmbeddedSecretKey), nil
}
if os.Getenv("AWS_ACCESS_KEY_ID") != "" {
return fmt.Sprintf("S3('%s/%s','%s','%s')", gcsEndpoint, backupName, os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")), nil
return fmt.Sprintf("S3('%s/%s/','%s','%s')", gcsEndpoint, backupName, os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")), nil
}
return "", fmt.Errorf("provide gcs->embedded_access_key and gcs->embedded_secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
}
Expand All @@ -255,7 +273,7 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return "", err
}
if b.cfg.AzureBlob.Container != "" {
return fmt.Sprintf("AzureBlobStorage('%s','%s','%s/%s')", azblobEndpoint, b.cfg.AzureBlob.Container, b.cfg.AzureBlob.ObjectDiskPath, backupName), nil
return fmt.Sprintf("AzureBlobStorage('%s','%s','%s/%s/')", azblobEndpoint, b.cfg.AzureBlob.Container, b.cfg.AzureBlob.ObjectDiskPath, backupName), nil
}
return "", fmt.Errorf("provide azblob->container and azblob->account_name, azblob->account_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName str
tablesSQL += ", "
}
}
backupSettings := b.getEmbeddedBackupDefaultSettings(version)
backupSettings := b.getEmbeddedBackupSettings(version)
embeddedBackupLocation, err := b.getEmbeddedBackupLocation(ctx, backupName)
if err != nil {
return "", nil, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ func (b *Backuper) fixEmbeddedMetadataRemote(ctx context.Context, backupName str
}
var fReader io.ReadCloser
remoteFilePath := path.Join(objectDiskPath, backupName, "metadata", fInfo.Name())
log.Debug().Msgf("read %s", remoteFilePath)
fReader, err = b.dst.GetFileReaderAbsolute(ctx, path.Join(objectDiskPath, backupName, "metadata", fInfo.Name()))
if err != nil {
return err
Expand All @@ -928,8 +929,9 @@ func (b *Backuper) fixEmbeddedMetadataRemote(ctx context.Context, backupName str
}
sqlQuery, sqlMetadataChanged, fixSqlErr := b.fixEmbeddedMetadataSQLQuery(ctx, sqlBytes, remoteFilePath, chVersion)
if fixSqlErr != nil {
return fixSqlErr
return fmt.Errorf("b.fixEmbeddedMetadataSQLQuery return error: %v", fixSqlErr)
}
log.Debug().Msgf("b.fixEmbeddedMetadataSQLQuery %s changed=%v", remoteFilePath, sqlMetadataChanged)
if sqlMetadataChanged {
err = b.dst.PutFileAbsolute(ctx, remoteFilePath, io.NopCloser(strings.NewReader(sqlQuery)))
if err != nil {
Expand Down Expand Up @@ -1653,7 +1655,7 @@ func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schem
}
}
}
settings := b.getEmbeddedBackupDefaultSettings(version)
settings := b.getEmbeddedRestoreSettings(version)
if schemaOnly {
settings = append(settings, "structure_only=1")
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/config-azblob-embedded-url.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ azblob:
endpoint_schema: http
container: container1
object_disk_path: object_disk/{cluster}/{shard}
path: backup
path: backup/{cluster}/{shard}
compression_format: none
api:
listen: :7171
Expand Down
28 changes: 16 additions & 12 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,29 +638,33 @@ func TestIntegrationEmbedded(t *testing.T) {
t.Logf("@TODO RESTORE Ordinary with old syntax still not works for %s version, look https://github.com/ClickHouse/ClickHouse/issues/43971", os.Getenv("CLICKHOUSE_VERSION"))
env, r := NewTestEnvironment(t)

//CUSTOM backup creates folder in each disk, need to clear
env.DockerExecNoError(r, "clickhouse", "rm", "-rfv", "/var/lib/clickhouse/disks/backups_s3/backup/")
env.runMainIntegrationScenario(t, "EMBEDDED_S3", "config-s3-embedded.yml")

// === AZURE ===
// CUSTOM backup create folder in each disk
env.DockerExecNoError(r, "clickhouse", "rm", "-rf", "/var/lib/clickhouse/disks/backups_azure/backup/")
env.runMainIntegrationScenario(t, "EMBEDDED_AZURE", "config-azblob-embedded.yml")
if compareVersion(version, "24.3") >= 0 {
if compareVersion(version, "24.8") >= 0 {
env.runMainIntegrationScenario(t, "EMBEDDED_AZURE_URL", "config-azblob-embedded-url.yml")
}
env.runMainIntegrationScenario(t, "EMBEDDED_AZURE", "config-azblob-embedded.yml")

// === GCS over S3 ===
if compareVersion(version, "24.3") >= 0 && os.Getenv("QA_GCS_OVER_S3_BUCKET") != "" {
//@todo think about named collections to avoid show credentials in logs look to https://github.com/fsouza/fake-gcs-server/issues/1330, https://github.com/fsouza/fake-gcs-server/pull/1164
env.InstallDebIfNotExists(r, "clickhouse-backup", "ca-certificates", "gettext-base")
env.DockerExecNoError(r, "clickhouse-backup", "bash", "-xec", "cat /etc/clickhouse-backup/config-gcs-embedded-url.yml.template | envsubst > /etc/clickhouse-backup/config-gcs-embedded-url.yml")
env.runMainIntegrationScenario(t, "EMBEDDED_GCS_URL", "config-gcs-embedded-url.yml")
}

// === S3 ===
// CUSTOM backup creates folder in each disk, need to clear
env.DockerExecNoError(r, "clickhouse", "rm", "-rfv", "/var/lib/clickhouse/disks/backups_s3/backup/")
env.runMainIntegrationScenario(t, "EMBEDDED_S3", "config-s3-embedded.yml")

if compareVersion(version, "23.8") >= 0 {
//CUSTOM backup creates folder in each disk, need to clear
env.DockerExecNoError(r, "clickhouse", "rm", "-rfv", "/var/lib/clickhouse/disks/backups_local/backup/")
env.runMainIntegrationScenario(t, "EMBEDDED_LOCAL", "config-s3-embedded-local.yml")
}
if compareVersion(version, "24.3") >= 0 {
if os.Getenv("QA_GCS_OVER_S3_BUCKET") != "" {
//@todo think about named collections to avoid show credentials in logs look to https://github.com/fsouza/fake-gcs-server/issues/1330, https://github.com/fsouza/fake-gcs-server/pull/1164
env.InstallDebIfNotExists(r, "clickhouse-backup", "ca-certificates", "gettext-base")
env.DockerExecNoError(r, "clickhouse-backup", "bash", "-xec", "cat /etc/clickhouse-backup/config-gcs-embedded-url.yml.template | envsubst > /etc/clickhouse-backup/config-gcs-embedded-url.yml")
env.runMainIntegrationScenario(t, "EMBEDDED_GCS_URL", "config-gcs-embedded-url.yml")
}
env.runMainIntegrationScenario(t, "EMBEDDED_S3_URL", "config-s3-embedded-url.yml")
}
//@TODO think about how to implements embedded backup for s3_plain disks
Expand Down

0 comments on commit dfa2811

Please sign in to comment.