Skip to content

Commit

Permalink
add http_send_timeout=300, http_receive_timeout=300 to connection par…
Browse files Browse the repository at this point in the history
…ameters
  • Loading branch information
Slach committed Jun 11, 2024
1 parent 8c36148 commit f6db4e3
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 58 deletions.
72 changes: 41 additions & 31 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ func (b *Backuper) isDiskTypeEncryptedObject(disk clickhouse.Disk, disks []click
return underlyingIdx >= 0
}

func (b *Backuper) getEmbeddedBackupDefaultSettings(version int) []string {
settings := []string{}
if b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs" {
settings = append(settings, "allow_s3_native_copy=1")
}
if b.cfg.General.RemoteStorage == "azblob" && version >= 24005001 {
settings = append(settings, "allow_azure_native_copy=1")
}
return settings
}

func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName string) (string, error) {
if b.cfg.ClickHouse.EmbeddedBackupDisk != "" {
return fmt.Sprintf("Disk('%s','%s')", b.cfg.ClickHouse.EmbeddedBackupDisk, backupName), nil
Expand Down Expand Up @@ -214,7 +225,6 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
return fmt.Sprintf("S3('%s/%s','%s','%s')", gcsEndpoint, backupName, os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")), nil
}
return "", fmt.Errorf("provide gcs->embedded_access_key and gcs->embedded_secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")

}
if b.cfg.General.RemoteStorage == "azblob" {
azblobEndpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationAZBLOB())
Expand Down Expand Up @@ -242,57 +252,57 @@ func (b *Backuper) applyMacrosToObjectDiskPath(ctx context.Context) error {
}

func (b *Backuper) buildEmbeddedLocationS3() string {
url := url.URL{}
url.Scheme = "https"
s3backupURL := url.URL{}
s3backupURL.Scheme = "https"
if strings.HasPrefix(b.cfg.S3.Endpoint, "http") {
newUrl, _ := url.Parse(b.cfg.S3.Endpoint)
url = *newUrl
url.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
newUrl, _ := s3backupURL.Parse(b.cfg.S3.Endpoint)
s3backupURL = *newUrl
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
} else {
url.Host = b.cfg.S3.Endpoint
url.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
s3backupURL.Host = b.cfg.S3.Endpoint
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
}
if b.cfg.S3.DisableSSL {
url.Scheme = "http"
s3backupURL.Scheme = "http"
}
if url.Host == "" && b.cfg.S3.Region != "" && b.cfg.S3.ForcePathStyle {
url.Host = "s3." + b.cfg.S3.Region + ".amazonaws.com"
url.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
if s3backupURL.Host == "" && b.cfg.S3.Region != "" && b.cfg.S3.ForcePathStyle {
s3backupURL.Host = "s3." + b.cfg.S3.Region + ".amazonaws.com"
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
}
if url.Host == "" && b.cfg.S3.Bucket != "" && !b.cfg.S3.ForcePathStyle {
url.Host = b.cfg.S3.Bucket + "." + "s3." + b.cfg.S3.Region + ".amazonaws.com"
url.Path = b.cfg.S3.ObjectDiskPath
if s3backupURL.Host == "" && b.cfg.S3.Bucket != "" && !b.cfg.S3.ForcePathStyle {
s3backupURL.Host = b.cfg.S3.Bucket + "." + "s3." + b.cfg.S3.Region + ".amazonaws.com"
s3backupURL.Path = b.cfg.S3.ObjectDiskPath
}
return url.String()
return s3backupURL.String()
}

func (b *Backuper) buildEmbeddedLocationGCS() string {
url := url.URL{}
url.Scheme = "https"
gcsBackupURL := url.URL{}
gcsBackupURL.Scheme = "https"
if b.cfg.GCS.ForceHttp {
url.Scheme = "http"
gcsBackupURL.Scheme = "http"
}
if b.cfg.GCS.Endpoint != "" {
if !strings.HasPrefix(b.cfg.GCS.Endpoint, "http") {
url.Host = b.cfg.GCS.Endpoint
gcsBackupURL.Host = b.cfg.GCS.Endpoint
} else {
newUrl, _ := url.Parse(b.cfg.GCS.Endpoint)
url = *newUrl
newUrl, _ := gcsBackupURL.Parse(b.cfg.GCS.Endpoint)
gcsBackupURL = *newUrl
}
}
if url.Host == "" {
url.Host = "storage.googleapis.com"
if gcsBackupURL.Host == "" {
gcsBackupURL.Host = "storage.googleapis.com"
}
url.Path = path.Join(b.cfg.GCS.Bucket, b.cfg.GCS.ObjectDiskPath)
return url.String()
gcsBackupURL.Path = path.Join(b.cfg.GCS.Bucket, b.cfg.GCS.ObjectDiskPath)
return gcsBackupURL.String()
}

func (b *Backuper) buildEmbeddedLocationAZBLOB() string {
url := url.URL{}
url.Scheme = b.cfg.AzureBlob.EndpointSchema
url.Host = b.cfg.AzureBlob.EndpointSuffix
url.Path = b.cfg.AzureBlob.AccountName
return fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;", b.cfg.AzureBlob.EndpointSchema, b.cfg.AzureBlob.AccountName, b.cfg.AzureBlob.AccountKey, url.String())
azblobBackupURL := url.URL{}
azblobBackupURL.Scheme = b.cfg.AzureBlob.EndpointSchema
azblobBackupURL.Host = b.cfg.AzureBlob.EndpointSuffix
azblobBackupURL.Path = b.cfg.AzureBlob.AccountName
return fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;", b.cfg.AzureBlob.EndpointSchema, b.cfg.AzureBlob.AccountName, b.cfg.AzureBlob.AccountKey, azblobBackupURL.String())
}

func (b *Backuper) getObjectDiskPath() (string, error) {
Expand Down
28 changes: 14 additions & 14 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewBackupName() string {

// CreateBackup - create new backup of all tables matched by tablePattern
// If backupName is empty string will use default backup name
func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string, partitions []string, schemaOnly, createRBAC, rbacOnly, createConfigs, configsOnly, skipCheckPartsColumns bool, version string, commandId int) error {
func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string, partitions []string, schemaOnly, createRBAC, rbacOnly, createConfigs, configsOnly, skipCheckPartsColumns bool, backupVersion string, commandId int) error {
ctx, cancel, err := status.Current.GetContextWithCancel(commandId)
if err != nil {
return err
Expand Down Expand Up @@ -112,6 +112,10 @@ func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string,
if err != nil {
return err
}
version, err := b.ch.GetVersion(ctx)
if err != nil {
return err
}
b.DefaultDataPath, err = b.ch.GetDefaultPath(disks)
if err != nil {
return err
Expand All @@ -130,9 +134,9 @@ func (b *Backuper) CreateBackup(backupName, diffFromRemote, tablePattern string,
return rbacAndConfigsErr
}
if b.cfg.ClickHouse.UseEmbeddedBackupRestore {
err = b.createBackupEmbedded(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, version, tablePattern, partitionsNameList, partitionsIdMap, tables, allDatabases, allFunctions, disks, diskMap, diskTypes, backupRBACSize, backupConfigSize, log, startBackup)
err = b.createBackupEmbedded(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, backupVersion, tablePattern, partitionsNameList, partitionsIdMap, tables, allDatabases, allFunctions, disks, diskMap, diskTypes, backupRBACSize, backupConfigSize, log, startBackup, version)
} else {
err = b.createBackupLocal(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, rbacOnly, configsOnly, version, partitionsIdMap, tables, tablePattern, disks, diskMap, diskTypes, allDatabases, allFunctions, backupRBACSize, backupConfigSize, log, startBackup)
err = b.createBackupLocal(ctx, backupName, diffFromRemote, doBackupData, schemaOnly, rbacOnly, configsOnly, backupVersion, partitionsIdMap, tables, tablePattern, disks, diskMap, diskTypes, allDatabases, allFunctions, backupRBACSize, backupConfigSize, log, startBackup, version)
}
if err != nil {
// delete local backup if can't create
Expand Down Expand Up @@ -185,7 +189,7 @@ func (b *Backuper) createRBACAndConfigsIfNecessary(ctx context.Context, backupNa
return backupRBACSize, backupConfigSize, nil
}

func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRemote string, doBackupData, schemaOnly, rbacOnly, configsOnly bool, backupVersion string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, tablePattern string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time) error {
func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRemote string, doBackupData, schemaOnly, rbacOnly, configsOnly bool, backupVersion string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, tablePattern string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time, version int) error {
// Create backup dir on all clickhouse disks
for _, disk := range disks {
if err := filesystemhelper.Mkdir(path.Join(disk.Path, "backup"), b.ch, disks); err != nil {
Expand Down Expand Up @@ -269,7 +273,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
log.Debug("create data")
shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "")
var addTableToBackupErr error
disksToPartsMap, realSize, objectDiskSize, addTableToBackupErr = b.AddTableToLocalBackup(createCtx, backupName, tablesDiffFromRemote, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}])
disksToPartsMap, realSize, objectDiskSize, addTableToBackupErr = b.AddTableToLocalBackup(createCtx, backupName, tablesDiffFromRemote, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}], version)
if addTableToBackupErr != nil {
log.Errorf("b.AddTableToLocalBackup error: %v", addTableToBackupErr)
return addTableToBackupErr
Expand Down Expand Up @@ -333,7 +337,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
return nil
}

func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBackup string, doBackupData, schemaOnly bool, backupVersion, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time) error {
func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBackup string, doBackupData, schemaOnly bool, backupVersion, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, backupRBACSize, backupConfigSize uint64, log *apexLog.Entry, startBackup time.Time, version int) error {
// TODO: Implement sharded backup operations for embedded backups
if doesShard(b.cfg.General.ShardedOperationMode) {
return fmt.Errorf("cannot perform embedded backup: %w", errShardOperationUnsupported)
Expand Down Expand Up @@ -372,7 +376,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac
}
}

backupSQL, tablesSizeSQL, err := b.generateEmbeddedBackupSQL(ctx, backupName, schemaOnly, tables, tablesTitle, partitionsNameList, l, baseBackup)
backupSQL, tablesSizeSQL, err := b.generateEmbeddedBackupSQL(ctx, backupName, schemaOnly, tables, tablesTitle, partitionsNameList, l, baseBackup, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -475,7 +479,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac
return nil
}

func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName string, schemaOnly bool, tables []clickhouse.Table, tablesTitle []metadata.TableTitle, partitionsNameList map[metadata.TableTitle][]string, tablesListLen int, baseBackup string) (string, map[metadata.TableTitle]string, error) {
func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName string, schemaOnly bool, tables []clickhouse.Table, tablesTitle []metadata.TableTitle, partitionsNameList map[metadata.TableTitle][]string, tablesListLen int, baseBackup string, version int) (string, map[metadata.TableTitle]string, error) {
tablesSQL := ""
tableSizeSQL := map[metadata.TableTitle]string{}
i := 0
Expand Down Expand Up @@ -517,12 +521,12 @@ func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName str
tablesSQL += ", "
}
}
backupSettings := b.getEmbeddedBackupDefaultSettings(version)
embeddedBackupLocation, err := b.getEmbeddedBackupLocation(ctx, backupName)
if err != nil {
return "", nil, err
}
backupSQL := fmt.Sprintf("BACKUP %s TO %s", tablesSQL, embeddedBackupLocation)
backupSettings := []string{"http_send_timeout=300", "http_receive_timeout=300"}
if schemaOnly {
backupSettings = append(backupSettings, "structure_only=1")
}
Expand Down Expand Up @@ -728,7 +732,7 @@ func (b *Backuper) createBackupRBACReplicated(ctx context.Context, rbacBackup st
return rbacDataSize, nil
}

func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, tablesDiffFromRemote map[metadata.TableTitle]metadata.TableMetadata, shadowBackupUUID string, diskList []clickhouse.Disk, table *clickhouse.Table, partitionsIdsMap common.EmptyMap) (map[string][]metadata.Part, map[string]int64, map[string]int64, error) {
func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, tablesDiffFromRemote map[metadata.TableTitle]metadata.TableMetadata, shadowBackupUUID string, diskList []clickhouse.Disk, table *clickhouse.Table, partitionsIdsMap common.EmptyMap, version int) (map[string][]metadata.Part, map[string]int64, map[string]int64, error) {
log := b.log.WithFields(apexLog.Fields{
"backup": backupName,
"operation": "create",
Expand All @@ -754,10 +758,6 @@ func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string,
return nil, nil, nil, err
}
log.Debug("frozen")
version, err := b.ch.GetVersion(ctx)
if err != nil {
return nil, nil, nil, err
}
realSize := map[string]int64{}
objectDiskSize := map[string]int64{}
disksToPartsMap := map[string][]metadata.Part{}
Expand Down
16 changes: 8 additions & 8 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (b *Backuper) Restore(backupName, tablePattern string, databaseMapping, par

}
if dataOnly || (schemaOnly == dataOnly && !rbacOnly && !configsOnly) {
if err := b.RestoreData(ctx, backupName, backupMetadata, dataOnly, metadataPath, tablePattern, partitions, disks); err != nil {
if err := b.RestoreData(ctx, backupName, backupMetadata, dataOnly, metadataPath, tablePattern, partitions, disks, version); err != nil {
return err
}
}
Expand Down Expand Up @@ -856,7 +856,7 @@ func (b *Backuper) restoreSchemaEmbedded(ctx context.Context, backupName string,
if err != nil {
return err
}
return b.restoreEmbedded(ctx, backupName, true, false, tablesForRestore, nil)
return b.restoreEmbedded(ctx, backupName, true, false, version, tablesForRestore, nil)
}

func (b *Backuper) fixEmbeddedMetadataRemote(ctx context.Context, backupName string, chVersion int) error {
Expand Down Expand Up @@ -1123,7 +1123,7 @@ func (b *Backuper) dropExistsTables(tablesForDrop ListOfTables, ignoreDependenci
}

// RestoreData - restore data for tables matched by tablePattern from backupName
func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, dataOnly bool, metadataPath, tablePattern string, partitions []string, disks []clickhouse.Disk) error {
func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, dataOnly bool, metadataPath, tablePattern string, partitions []string, disks []clickhouse.Disk, version int) error {
var err error
startRestoreData := time.Now()
log := apexLog.WithFields(apexLog.Fields{
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMet
}
log.Debugf("found %d tables with data in backup", len(tablesForRestore))
if b.isEmbedded {
err = b.restoreDataEmbedded(ctx, backupName, dataOnly, tablesForRestore, partitionsNameList)
err = b.restoreDataEmbedded(ctx, backupName, dataOnly, version, tablesForRestore, partitionsNameList)
} else {
err = b.restoreDataRegular(ctx, backupName, backupMetadata, tablePattern, tablesForRestore, diskMap, diskTypes, disks, log)
}
Expand All @@ -1173,8 +1173,8 @@ func (b *Backuper) RestoreData(ctx context.Context, backupName string, backupMet
return nil
}

func (b *Backuper) restoreDataEmbedded(ctx context.Context, backupName string, dataOnly bool, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error {
return b.restoreEmbedded(ctx, backupName, false, dataOnly, tablesForRestore, partitionsNameList)
func (b *Backuper) restoreDataEmbedded(ctx context.Context, backupName string, dataOnly bool, version int, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error {
return b.restoreEmbedded(ctx, backupName, false, dataOnly, version, tablesForRestore, partitionsNameList)
}

func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, tablePattern string, tablesForRestore ListOfTables, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, log *apexLog.Entry) error {
Expand Down Expand Up @@ -1505,7 +1505,7 @@ func (b *Backuper) changeTablePatternFromRestoreDatabaseMapping(tablePattern str
return tablePattern
}

func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schemaOnly, dataOnly bool, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error {
func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schemaOnly, dataOnly bool, version int, tablesForRestore ListOfTables, partitionsNameList map[metadata.TableTitle][]string) error {
tablesSQL := ""
l := len(tablesForRestore)
for i, t := range tablesForRestore {
Expand Down Expand Up @@ -1540,7 +1540,7 @@ func (b *Backuper) restoreEmbedded(ctx context.Context, backupName string, schem
}
}
}
settings := []string{"http_send_timeout=300", "http_receive_timeout=300"}
settings := b.getEmbeddedBackupDefaultSettings(version)
if schemaOnly {
settings = append(settings, "structure_only=1")
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ func (ch *ClickHouse) Connect() error {
Password: ch.Config.Password,
},
Settings: clickhouse.Settings{
"connect_timeout": int(timeout.Seconds()),
"receive_timeout": int(timeout.Seconds()),
"send_timeout": int(timeout.Seconds()),
"connect_timeout": int(timeout.Seconds()),
"receive_timeout": int(timeout.Seconds()),
"send_timeout": int(timeout.Seconds()),
"http_send_timeout": 300,
"http_receive_timeout": 300,
},
MaxOpenConns: ch.Config.MaxConnections,
ConnMaxLifetime: 0, // don't change it, it related to SYSTEM SHUTDOWN behavior for properly rebuild RBAC lists on 20.4-22.3
Expand Down
2 changes: 1 addition & 1 deletion test/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ services:

zookeeper:
# @TODO back :latest default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749
image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.3}
image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.4}
hostname: zookeeper
environment:
ZOO_4LW_COMMANDS_WHITELIST: "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '2.4'
services:
zookeeper:
# @TODO back :latest default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749
image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.3}
image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.4}
expose:
- "2181"
environment:
Expand Down

0 comments on commit f6db4e3

Please sign in to comment.