From f6db4e3eec8d604535904d5b4cbf3fc001542202 Mon Sep 17 00:00:00 2001 From: Slach Date: Tue, 11 Jun 2024 20:05:19 +0500 Subject: [PATCH] add http_send_timeout=300, http_receive_timeout=300 to connection parameters --- pkg/backup/backuper.go | 72 +++++++++++-------- pkg/backup/create.go | 28 ++++---- pkg/backup/restore.go | 16 ++--- pkg/clickhouse/clickhouse.go | 8 ++- test/integration/docker-compose.yml | 2 +- .../docker-compose/docker-compose.yml | 2 +- 6 files changed, 70 insertions(+), 58 deletions(-) diff --git a/pkg/backup/backuper.go b/pkg/backup/backuper.go index 9c16c5bb..b21175c6 100644 --- a/pkg/backup/backuper.go +++ b/pkg/backup/backuper.go @@ -181,6 +181,17 @@ func (b *Backuper) isDiskTypeEncryptedObject(disk clickhouse.Disk, disks []click return underlyingIdx >= 0 } +func (b *Backuper) getEmbeddedBackupDefaultSettings(version int) []string { + settings := []string{} + if b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs" { + settings = append(settings, "allow_s3_native_copy=1") + } + if b.cfg.General.RemoteStorage == "azblob" && version >= 24005001 { + settings = append(settings, "allow_azure_native_copy=1") + } + return settings +} + func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName string) (string, error) { if b.cfg.ClickHouse.EmbeddedBackupDisk != "" { return fmt.Sprintf("Disk('%s','%s')", b.cfg.ClickHouse.EmbeddedBackupDisk, backupName), nil @@ -214,7 +225,6 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str return fmt.Sprintf("S3('%s/%s','%s','%s')", gcsEndpoint, backupName, os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")), nil } return "", fmt.Errorf("provide gcs->embedded_access_key and gcs->embedded_secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`") - } if b.cfg.General.RemoteStorage == "azblob" { azblobEndpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationAZBLOB()) @@ -242,57 +252,57 @@ func (b *Backuper) applyMacrosToObjectDiskPath(ctx context.Context) error { } func (b *Backuper) buildEmbeddedLocationS3() string { - url := url.URL{} - url.Scheme = "https" + s3backupURL := url.URL{} + s3backupURL.Scheme = "https" if strings.HasPrefix(b.cfg.S3.Endpoint, "http") { - newUrl, _ := url.Parse(b.cfg.S3.Endpoint) - url = *newUrl - url.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath) + newUrl, _ := s3backupURL.Parse(b.cfg.S3.Endpoint) + s3backupURL = *newUrl + s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath) } else { - url.Host = b.cfg.S3.Endpoint - url.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath) + s3backupURL.Host = b.cfg.S3.Endpoint + s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath) } if b.cfg.S3.DisableSSL { - url.Scheme = "http" + s3backupURL.Scheme = "http" } - if url.Host == "" && b.cfg.S3.Region != "" && b.cfg.S3.ForcePathStyle { - url.Host = "s3." + b.cfg.S3.Region + ".amazonaws.com" - url.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath) + if s3backupURL.Host == "" && b.cfg.S3.Region != "" && b.cfg.S3.ForcePathStyle { + s3backupURL.Host = "s3." + b.cfg.S3.Region + ".amazonaws.com" + s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath) } - if url.Host == "" && b.cfg.S3.Bucket != "" && !b.cfg.S3.ForcePathStyle { - url.Host = b.cfg.S3.Bucket + "." + "s3." + b.cfg.S3.Region + ".amazonaws.com" - url.Path = b.cfg.S3.ObjectDiskPath + if s3backupURL.Host == "" && b.cfg.S3.Bucket != "" && !b.cfg.S3.ForcePathStyle { + s3backupURL.Host = b.cfg.S3.Bucket + "." + "s3." + b.cfg.S3.Region + ".amazonaws.com" + s3backupURL.Path = b.cfg.S3.ObjectDiskPath } - return url.String() + return s3backupURL.String() } func (b *Backuper) buildEmbeddedLocationGCS() string { - url := url.URL{} - url.Scheme = "https" + gcsBackupURL := url.URL{} + gcsBackupURL.Scheme = "https" if b.cfg.GCS.ForceHttp { - url.Scheme = "http" + gcsBackupURL.Scheme = "http" } if b.cfg.GCS.Endpoint != "" { if !strings.HasPrefix(b.cfg.GCS.Endpoint, "http") { - url.Host = b.cfg.GCS.Endpoint + gcsBackupURL.Host = b.cfg.GCS.Endpoint } else { - newUrl, _ := url.Parse(b.cfg.GCS.Endpoint) - url = *newUrl + newUrl, _ := gcsBackupURL.Parse(b.cfg.GCS.Endpoint) + gcsBackupURL = *newUrl } } - if url.Host == "" { - url.Host = "storage.googleapis.com" + if gcsBackupURL.Host == "" { + gcsBackupURL.Host = "storage.googleapis.com" } - url.Path = path.Join(b.cfg.GCS.Bucket, b.cfg.GCS.ObjectDiskPath) - return url.String() + gcsBackupURL.Path = path.Join(b.cfg.GCS.Bucket, b.cfg.GCS.ObjectDiskPath) + return gcsBackupURL.String() } func (b *Backuper) buildEmbeddedLocationAZBLOB() string { - url := url.URL{} - url.Scheme = b.cfg.AzureBlob.EndpointSchema - url.Host = b.cfg.AzureBlob.EndpointSuffix - url.Path = b.cfg.AzureBlob.AccountName - return fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;", b.cfg.AzureBlob.EndpointSchema, b.cfg.AzureBlob.AccountName, b.cfg.AzureBlob.AccountKey, url.String()) + azblobBackupURL := url.URL{} + azblobBackupURL.Scheme = b.cfg.AzureBlob.EndpointSchema + azblobBackupURL.Host = b.cfg.AzureBlob.EndpointSuffix + azblobBackupURL.Path = b.cfg.AzureBlob.AccountName + return fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;", b.cfg.AzureBlob.EndpointSchema, b.cfg.AzureBlob.AccountName, b.cfg.AzureBlob.AccountKey, azblobBackupURL.String()) } func (b *Backuper) getObjectDiskPath() (string, error) { diff --git a/pkg/backup/create.go b/pkg/backup/create.go index e2f0dc10..a29f25f1 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -55,7 +55,7 @@ func NewBackupName() string { // CreateBackup - create new backup of all tables matched by tablePattern // If backupName is empty string will use default backup name -func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string, partitions []string, schemaOnly, createRBAC, rbacOnly, createConfigs, configsOnly, skipCheckPartsColumns bool, version string, commandId int) error { +func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string, partitions []string, schemaOnly, createRBAC, rbacOnly, createConfigs, configsOnly, skipCheckPartsColumns bool, backupVersion string, commandId int) error { ctx, cancel, err := status.Current.GetContextWithCancel(commandId) if err != nil { return err @@ -112,6 +112,10 @@ func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string, if err != nil { return err } + version, err := b.ch.GetVersion(ctx) + if err != nil { + return err + } b.DefaultDataPath, err = b.ch.GetDefaultPath(disks) if err != nil { return err @@ -130,9 +134,9 @@ func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string, return rbacAndConfigsErr } if b.cfg.ClickHouse.UseEmbeddedBackupRestore { - err = b.createBackupEmbedded(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, version, tablePattern, partitionsNameList, partitionsIdMap, tables, allDatabases, allFunctions, disks, diskMap, diskTypes, backupRBACSize, backupConfigSize, log, startBackup) + err = b.createBackupEmbedded(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, backupVersion, tablePattern, partitionsNameList, partitionsIdMap, tables, allDatabases, allFunctions, disks, diskMap, diskTypes, backupRBACSize, backupConfigSize, log, startBackup, version) } else { - err = b.createBackupLocal(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, rbacOnly, configsOnly, version, partitionsIdMap, tables, tablePattern, disks, diskMap, diskTypes, allDatabases, allFunctions, backupRBACSize, backupConfigSize, log, startBackup) + err = b.createBackupLocal(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, rbacOnly, configsOnly, backupVersion, partitionsIdMap, tables, tablePattern, disks, diskMap, diskTypes, allDatabases, allFunctions, backupRBACSize, backupConfigSize, log, startBackup, version) } if err != nil { // delete local backup if can't create @@ -185,7 +189,7 @@ func (b *Backuper) createRBACAndConfigsIfNecessary(ctx context.Context, backupNa return backupRBACSize, backupConfigSize, nil } -func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRemote string, doBackupData, schemaOnly, rbacOnly, configsOnly bool, backupVersion string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, tablePattern string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time) error { +func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRemote string, doBackupData, schemaOnly, rbacOnly, configsOnly bool, backupVersion string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, tablePattern string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time, version int) error { // Create backup dir on all clickhouse disks for _, disk := range disks { if err := filesystemhelper.Mkdir(path.Join(disk.Path, "backup"), b.ch, disks); err != nil { @@ -269,7 +273,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe log.Debug("create data") shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "") var addTableToBackupErr error - disksToPartsMap, realSize, objectDiskSize, addTableToBackupErr = b.AddTableToLocalBackup(createCtx, backupName, tablesDiffFromRemote, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}]) + disksToPartsMap, realSize, objectDiskSize, addTableToBackupErr = b.AddTableToLocalBackup(createCtx, backupName, tablesDiffFromRemote, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}], version) if addTableToBackupErr != nil { log.Errorf("b.AddTableToLocalBackup error: %v", addTableToBackupErr) return addTableToBackupErr @@ -333,7 +337,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe return nil } -func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBackup string, doBackupData, schemaOnly bool, backupVersion, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time) error { +func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBackup string, doBackupData, schemaOnly bool, backupVersion, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time, version int) error { // TODO: Implement sharded backup operations for embedded backups if doesShard(b.cfg.General.ShardedOperationMode) { return fmt.Errorf("cannot perform embedded backup: %w", errShardOperationUnsupported) @@ -372,7 +376,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac } } - backupSQL, tablesSizeSQL, err := b.generateEmbeddedBackupSQL(ctx, backupName, schemaOnly, tables, tablesTitle, partitionsNameList, l, baseBackup) + backupSQL, tablesSizeSQL, err := b.generateEmbeddedBackupSQL(ctx, backupName, schemaOnly, tables, tablesTitle, partitionsNameList, l, baseBackup, version) if err != nil { return err } @@ -475,7 +479,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac return nil } -func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName string, schemaOnly bool, tables []clickhouse.Table, tablesTitle []metadata.TableTitle, partitionsNameList map[metadata.TableTitle][]string, tablesListLen int, baseBackup string) (string, map[metadata.TableTitle]string, error) { +func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName string, schemaOnly bool, tables []clickhouse.Table, tablesTitle []metadata.TableTitle, partitionsNameList map[metadata.TableTitle][]string, tablesListLen int, baseBackup string, version int) (string, map[metadata.TableTitle]string, error) { tablesSQL := "" tableSizeSQL := map[metadata.TableTitle]string{} i := 0 @@ -517,12 +521,12 @@ func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName str tablesSQL += ", " } } + backupSettings := b.getEmbeddedBackupDefaultSettings(version) embeddedBackupLocation, err := b.getEmbeddedBackupLocation(ctx, backupName) if err != nil { return "", nil, err } backupSQL := fmt.Sprintf("BACKUP %s TO %s", tablesSQL, embeddedBackupLocation) - backupSettings := []string{"http_send_timeout=300", "http_receive_timeout=300"} if schemaOnly { backupSettings = append(backupSettings, "structure_only=1") } @@ -728,7 +732,7 @@ func (b *Backuper) createBackupRBACReplicated(ctx context.Context, rbacBackup st return rbacDataSize, nil } -func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, tablesDiffFromRemote map[metadata.TableTitle]metadata.TableMetadata, shadowBackupUUID string, diskList []clickhouse.Disk, table *clickhouse.Table, partitionsIdsMap common.EmptyMap) (map[string][]metadata.Part, map[string]int64, map[string]int64, error) { +func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, tablesDiffFromRemote map[metadata.TableTitle]metadata.TableMetadata, shadowBackupUUID string, diskList []clickhouse.Disk, table *clickhouse.Table, partitionsIdsMap common.EmptyMap, version int) (map[string][]metadata.Part, map[string]int64, map[string]int64, error) { log := b.log.WithFields(apexLog.Fields{ "backup": backupName, "operation": "create", @@ -754,10 +758,6 @@ func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, return nil, nil, nil, err } log.Debug("frozen") - version, err := b.ch.GetVersion(ctx) - if err != nil { - return nil, nil, nil, err - } realSize := map[string]int64{} objectDiskSize := map[string]int64{} disksToPartsMap := map[string][]metadata.Part{} diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index ef4fa084..3aef3e7f 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -207,7 +207,7 @@ func (b *Backuper) Restore(backupName, tablePattern string, databaseMapping, par } if dataOnly || (schemaOnly == dataOnly && !rbacOnly && !configsOnly) { - if err := b.RestoreData(ctx, backupName, backupMetadata, dataOnly, metadataPath, tablePattern, partitions, disks); err != nil { + if err := b.RestoreData(ctx, backupName, backupMetadata, dataOnly, metadataPath, tablePattern, partitions, disks, version); err != nil { return err } } @@ -856,7 +856,7 @@ func (b *Backuper) restoreSchemaEmbedded(ctx context.Context, backupName string, if err != nil { return err } - return b.restoreEmbedded(ctx, backupName, true, false, tablesForRestore, nil) + return b.restoreEmbedded(ctx, backupName, true, false, version, tablesForRestore, nil) } func (b *Backuper) fixEmbeddedMetadataRemote(ctx context.Context, backupName string, chVersion int) error { @@ -1123,7 +1123,7 @@ func (b *Backuper) dropExistsTables(tablesForDrop ListOfTables, ignoreDependenci } // RestoreData - restore data for tables matched by tablePattern from backupName -func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, dataOnly bool, metadataPath, tablePattern string, partitions []string, disks []clickhouse.Disk) error { +func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, dataOnly bool, metadataPath, tablePattern string, partitions []string, disks []clickhouse.Disk, version int) error { var err error startRestoreData := time.Now() log := apexLog.WithFields(apexLog.Fields{ @@ -1162,7 +1162,7 @@ func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMet } log.Debugf("found %d tables with data in backup", len(tablesForRestore)) if b.isEmbedded { - err = b.restoreDataEmbedded(ctx, backupName, dataOnly, tablesForRestore, partitionsNameList) + err = b.restoreDataEmbedded(ctx, backupName, dataOnly, version, tablesForRestore, partitionsNameList) } else { err = b.restoreDataRegular(ctx, backupName, backupMetadata, tablePattern, tablesForRestore, diskMap, diskTypes, disks, log) } @@ -1173,8 +1173,8 @@ func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMet return nil } -func (b *Backuper) restoreDataEmbedded(ctx context.Context, backupName string, dataOnly bool, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error { - return b.restoreEmbedded(ctx, backupName, false, dataOnly, tablesForRestore, partitionsNameList) +func (b *Backuper) restoreDataEmbedded(ctx context.Context, backupName string, dataOnly bool, version int, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error { + return b.restoreEmbedded(ctx, backupName, false, dataOnly, version, tablesForRestore, partitionsNameList) } func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, tablePattern string, tablesForRestore ListOfTables, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, log *apexLog.Entry) error { @@ -1505,7 +1505,7 @@ func (b *Backuper) changeTablePatternFromRestoreDatabaseMapping(tablePattern str return tablePattern } -func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schemaOnly, dataOnly bool, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error { +func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schemaOnly, dataOnly bool, version int, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error { tablesSQL := "" l := len(tablesForRestore) for i, t := range tablesForRestore { @@ -1540,7 +1540,7 @@ func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schem } } } - settings := []string{"http_send_timeout=300", "http_receive_timeout=300"} + settings := b.getEmbeddedBackupDefaultSettings(version) if schemaOnly { settings = append(settings, "structure_only=1") } diff --git a/pkg/clickhouse/clickhouse.go b/pkg/clickhouse/clickhouse.go index 8b4fb514..d9162535 100644 --- a/pkg/clickhouse/clickhouse.go +++ b/pkg/clickhouse/clickhouse.go @@ -56,9 +56,11 @@ func (ch *ClickHouse) Connect() error { Password: ch.Config.Password, }, Settings: clickhouse.Settings{ - "connect_timeout": int(timeout.Seconds()), - "receive_timeout": int(timeout.Seconds()), - "send_timeout": int(timeout.Seconds()), + "connect_timeout": int(timeout.Seconds()), + "receive_timeout": int(timeout.Seconds()), + "send_timeout": int(timeout.Seconds()), + "http_send_timeout": 300, + "http_receive_timeout": 300, }, MaxOpenConns: ch.Config.MaxConnections, ConnMaxLifetime: 0, // don't change it, it related to SYSTEM SHUTDOWN behavior for properly rebuild RBAC lists on 20.4-22.3 diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 402e6605..c662959f 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -88,7 +88,7 @@ services: zookeeper: # @TODO back :latest default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749 - image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.3} + image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.4} hostname: zookeeper environment: ZOO_4LW_COMMANDS_WHITELIST: "*" diff --git a/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml b/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml index 8410fd44..c9e46da1 100644 --- a/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml +++ b/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml @@ -3,7 +3,7 @@ version: '2.4' services: zookeeper: # @TODO back :latest default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749 - image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.3} + image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.4} expose: - "2181" environment: