Skip to content

Commit

Permalink
debug #888, use dockerPool instead of up+down, remove partial downloa…
Browse files Browse the repository at this point in the history
…d metadata for object disk required backup during `restore`
  • Loading branch information
Slach committed Jul 29, 2024
1 parent d0f9dd8 commit 0e085c1
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 72 deletions.
19 changes: 18 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ jobs:
QA_GCS_OVER_S3_SECRET_KEY: ${{ secrets.QA_GCS_OVER_S3_SECRET_KEY }}
QA_GCS_OVER_S3_BUCKET: ${{ secrets.QA_GCS_OVER_S3_BUCKET }}
run: |
set -x
set -xe
echo "CLICKHOUSE_VERSION=${CLICKHOUSE_VERSION}"
echo "GCS_TESTS=${GCS_TESTS}"
Expand All @@ -316,6 +316,23 @@ jobs:
export CUR_DIR="$(pwd)/test/integration"
export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race"
docker compose -f "${CUR_DIR}/${COMPOSE_FILE}" --progress=quiet pull
pids=()
for ((i = 1; i <= RUN_PARALLEL; i++)); do
docker compose -f ${CUR_DIR}/${COMPOSE_FILE} --project-name project${i} --progress plain up -d &
pids+=($!)
done
for pid in "${pids[@]}"; do
if wait "$pid"; then
echo "$pid docker compose up successful"
else
echo "$pid the docker compose up failed. Exiting."
exit 1 # Exit with an error code if any command fails
fi
done
go test -parallel ${RUN_PARALLEL} -timeout 60m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go
- name: Format integration coverage
env:
Expand Down
17 changes: 17 additions & 0 deletions pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,20 @@ func (b *Backuper) CleanRemoteBroken(commandId int) error {
}
return nil
}

func (b *Backuper) cleanPartialRequiredBackup(ctx context.Context, disks []clickhouse.Disk, currentBackupName string) error {
if localBackups, _, err := b.GetLocalBackups(ctx, disks); err == nil {
for _, localBackup := range localBackups {
if localBackup.BackupName != currentBackupName && localBackup.DataSize+localBackup.CompressedSize+localBackup.MetadataSize+localBackup.RBACSize == 0 {
if err = b.RemoveBackupLocal(ctx, localBackup.BackupName, disks); err != nil {
return fmt.Errorf("CleanPartialRequiredBackups %s -> RemoveBackupLocal cleaning error: %v", localBackup.BackupName, err)
} else {
b.log.Infof("CleanPartialRequiredBackups %s deleted", localBackup.BackupName)
}
}
}
} else {
return fmt.Errorf("CleanPartialRequiredBackups -> GetLocalBackups cleaning error: %v", err)
}
return nil
}
22 changes: 6 additions & 16 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,16 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [

//clean partially downloaded requiredBackup
if remoteBackup.RequiredBackup != "" {
if localBackups, _, err = b.GetLocalBackups(ctx, disks); err == nil {
for _, localBackup := range localBackups {
if localBackup.BackupName != remoteBackup.BackupName && localBackup.DataSize+localBackup.CompressedSize+localBackup.MetadataSize+localBackup.RBACSize == 0 {
if err = b.RemoveBackupLocal(ctx, localBackup.BackupName, disks); err != nil {
return fmt.Errorf("downloadWithDiff -> RemoveBackupLocal cleaning error: %v", err)
} else {
b.log.Infof("partial required backup %s deleted", localBackup.BackupName)
}
}
}
} else {
return fmt.Errorf("downloadWithDiff -> GetLocalBackups cleaning error: %v", err)
if err = b.cleanPartialRequiredBackup(ctx, disks, remoteBackup.BackupName); err != nil {
return err
}
}

log.WithFields(apexLog.Fields{
"duration": utils.HumanizeDuration(time.Since(startDownload)),
"download_size": utils.FormatBytes(dataSize + metadataSize + rbacSize + configSize),
"object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize),
"version": backupVersion,
"duration": utils.HumanizeDuration(time.Since(startDownload)),
"download_size": utils.FormatBytes(dataSize + metadataSize + rbacSize + configSize),
"object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize),
"version": backupVersion,
}).Info("done")
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ func (b *Backuper) Restore(backupName, tablePattern string, databaseMapping, tab
}
}
}

//clean partially downloaded requiredBackup
if backupMetadata.RequiredBackup != "" {
if err = b.cleanPartialRequiredBackup(ctx, disks, backupMetadata.BackupName); err != nil {
return err
}
}

log.WithFields(apexLog.Fields{
"duration": utils.HumanizeDuration(time.Since(startRestore)),
"version": backupVersion,
Expand Down
120 changes: 69 additions & 51 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
pool "github.com/jolestar/go-commons-pool/v2"
"math/rand"
"os"
"os/exec"
Expand All @@ -16,6 +17,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,6 +36,9 @@ import (
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
)

var projectId atomic.Uint32
var dockerPool *pool.ObjectPool

// setup log level
func init() {
log.SetHandler(logcli.New(os.Stdout))
Expand All @@ -45,6 +50,27 @@ func init() {
logLevel = os.Getenv("TEST_LOG_LEVEL")
}
log.SetLevelFromString(logLevel)

runParallel, isExists := os.LookupEnv("RUN_PARALLEL")
if !isExists {
runParallel = "1"
}
runParallelInt, err := strconv.Atoi(runParallel)
if err != nil {
log.Fatalf("invalid RUN_PARALLEL environment variable value %s", runParallel)
}

ctx := context.Background()
factory := pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
projectId.Add(1)
env := TestEnvironment{
ProjectName: fmt.Sprintf("project%d", projectId.Load() % uint32(runParallelInt)),
}
return &env, nil
})
dockerPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory)
dockerPool.Config.MaxTotal = runParallelInt
}

const dbNameAtomic = "_test#$.ДБ_atomic_"
Expand Down Expand Up @@ -414,52 +440,44 @@ var defaultIncrementData = []TestDataStruct{
}

func NewTestEnvironment(t *testing.T) (*TestEnvironment, *require.Assertions) {
t.Helper()
r := require.New(t)
if os.Getenv("COMPOSE_FILE") == "" || os.Getenv("CUR_DIR") == "" {
t.Fatal("please setup COMPOSE_FILE and CUR_DIR environment variables")
}
env := TestEnvironment{
ProjectName: "all",
t.Helper()
if os.Getenv("RUN_PARALLEL") != "1" /* && t.Name() != "TestLongListRemote" */ {
t.Parallel()
}
if os.Getenv("RUN_PARALLEL") != "1" {
if t.Name() != "TestLongListRemote" {
t.Logf("[%s] executing in parallel mode", t.Name())
t.Parallel()
} else {
t.Logf("[%s] executing in sequence mode", t.Name())
}
env.ProjectName = strings.ToLower(t.Name())
upCmd := append(env.GetDefaultComposeCommand(), "up", "-d")
upStart := time.Now()
out, err := utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", upCmd...)
if err != nil {
logs, _ := utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", append(env.GetDefaultComposeCommand(),"logs")...)
t.Log(logs)
}
r.NoError(err, "%s\n\n%s\n\n[ERROR]\n%v", "docker "+strings.Join(upCmd, " "), out, err)
t.Logf("%s docker compose up time = %s", t.Name(), time.Since(upStart))

r := require.New(t)
envObj, err := dockerPool.BorrowObject(context.Background())
if err != nil {
t.Fatalf("dockerPool.BorrowObject retrun error: %v", err)
}
env := envObj.(*TestEnvironment)

if os.Getenv("RUN_PARALLEL") != "1" /* && t.Name() != "TestLongListRemote" */ {
t.Logf("%s run in parallel mode project=%s", t.Name(), env.ProjectName)
} else {
t.Logf("[%s] executing in sequence mode", t.Name())
t.Logf("%s run in sequence mode project=%s", t.Name(), env.ProjectName)
}

if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "1.1.54394") <= 0 {
r := require.New(&testing.T{})
env.InstallDebIfNotExists(r, "clickhouse-backup", "ca-certificates", "curl")
env.DockerExecNoError(r, "clickhouse-backup", "update-ca-certificates")
}
return &env, r
return env, r
}

func (env *TestEnvironment) Cleanup(t *testing.T, r *require.Assertions) {
if "1" != os.Getenv("RUN_PARALLEL") {
downStart := time.Now()
env.ch.Close()
downCmd := append(env.GetDefaultComposeCommand(), "down", "--remove-orphans", "--volumes", "--timeout", "1")
out, err := utils.ExecCmdOut(context.Background(), dockerExecTimeout, "docker", downCmd...)
r.NoError(err, "%s\n\n%s\n\n[ERROR]\n%v", "docker "+strings.Join(downCmd, " "), out, err)
t.Logf("%s docker compose down time = %s", t.Name(), time.Since(downStart))
env.ch.Close()
if t.Name() == "TestIntegrationCustomRsync" {
env.DockerExecNoError(r, "sshd", "rm", "-rf", "/root/rsync_backups")
}
if err := dockerPool.ReturnObject(context.Background(), env); err != nil {
t.Fatalf("dockerPool.ReturnObject error: %+v", err)
}

}


Expand Down Expand Up @@ -2120,7 +2138,7 @@ func TestIntegrationAzure(t *testing.T) {
t.Skip("Skipping Azure integration tests...")
return
}
env, r := NewTestEnvironment(t)
env, r :=NewTestEnvironment(t)
env.runMainIntegrationScenario(t, "AZBLOB", "config-azblob.yml")
env.Cleanup(t, r)
}
Expand All @@ -2137,7 +2155,7 @@ func TestIntegrationGCS(t *testing.T) {
t.Skip("Skipping GCS integration tests...")
return
}
env, r := NewTestEnvironment(t)
env, r :=NewTestEnvironment(t)
env.runMainIntegrationScenario(t, "GCS", "config-gcs.yml")
env.Cleanup(t, r)
}
Expand All @@ -2147,19 +2165,19 @@ func TestIntegrationGCSWithCustomEndpoint(t *testing.T) {
t.Skip("Skipping GCS_EMULATOR integration tests...")
return
}
env, r := NewTestEnvironment(t)
env, r :=NewTestEnvironment(t)
env.runMainIntegrationScenario(t, "GCS_EMULATOR", "config-gcs-custom-endpoint.yml")
env.Cleanup(t, r)
}

func TestIntegrationSFTPAuthPassword(t *testing.T) {
env, r := NewTestEnvironment(t)
env, r :=NewTestEnvironment(t)
env.runMainIntegrationScenario(t, "SFTP", "config-sftp-auth-password.yaml")
env.Cleanup(t, r)
}

func TestIntegrationFTP(t *testing.T) {
env, r := NewTestEnvironment(t)
env, r :=NewTestEnvironment(t)
if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "21.3") >= 1 {
env.runMainIntegrationScenario(t, "FTP", "config-ftp.yaml")
} else {
Expand Down Expand Up @@ -2370,40 +2388,40 @@ func (env *TestEnvironment) runMainIntegrationScenario(t *testing.T, remoteStora
testBackupSpecifiedPartitions(t, r, env, remoteStorageType, backupConfig)

// main test scenario
testBackupName := fmt.Sprintf("%s_full_%d", t.Name(), rand.Int())
fullBackupName := fmt.Sprintf("%s_full_%d", t.Name(), rand.Int())
incrementBackupName := fmt.Sprintf("%s_increment_%d", t.Name(), rand.Int())
incrementBackupName2 := fmt.Sprintf("%s_increment2_%d", t.Name(), rand.Int())
databaseList := []string{dbNameOrdinary, dbNameAtomic, dbNameMySQL, dbNamePostgreSQL, Issue331Atomic, Issue331Ordinary}
tablesPattern := fmt.Sprintf("*_%s.*", t.Name())
log.Debug("Clean before start")
fullCleanup(t, r, env, []string{testBackupName, incrementBackupName}, []string{"remote", "local"}, databaseList, false, false, backupConfig)
fullCleanup(t, r, env, []string{fullBackupName, incrementBackupName}, []string{"remote", "local"}, databaseList, false, false, backupConfig)

env.DockerExecNoError(r, "minio", "mc", "ls", "local/clickhouse/disk_s3")
testData := generateTestData(t, r, env, remoteStorageType, defaultTestData)

env.DockerExecNoError(r, "minio", "mc", "ls", "local/clickhouse/disk_s3")

log.Debug("Create backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "create", "--tables", tablesPattern, testBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "create", "--tables", tablesPattern, fullBackupName)

incrementData := generateIncrementTestData(t, r, env, remoteStorageType, defaultIncrementData, 1)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "create", "--tables", tablesPattern, incrementBackupName)

log.Debug("Upload full")
uploadCmd := fmt.Sprintf("%s_COMPRESSION_FORMAT=zstd CLICKHOUSE_BACKUP_CONFIG=/etc/clickhouse-backup/%s clickhouse-backup upload --resume %s", remoteStorageType, backupConfig, testBackupName)
env.checkResumeAlreadyProcessed(uploadCmd, testBackupName, "upload", r, remoteStorageType)
uploadCmd := fmt.Sprintf("%s_COMPRESSION_FORMAT=zstd CLICKHOUSE_BACKUP_CONFIG=/etc/clickhouse-backup/%s clickhouse-backup upload --resume %s", remoteStorageType, backupConfig, fullBackupName)
env.checkResumeAlreadyProcessed(uploadCmd, fullBackupName, "upload", r, remoteStorageType)

// https://github.com/Altinity/clickhouse-backup/pull/900
if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "21.8") >= 0 {
log.Debug("create --diff-from-remote backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "create", "--diff-from-remote", testBackupName, "--tables", tablesPattern, incrementBackupName2)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "create", "--diff-from-remote", fullBackupName, "--tables", tablesPattern, incrementBackupName2)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "upload", incrementBackupName2)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "remote", incrementBackupName2)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "local", incrementBackupName2)
}

log.Debug("Upload increment")
uploadCmd = fmt.Sprintf("clickhouse-backup -c /etc/clickhouse-backup/%s upload %s --diff-from-remote %s --resume", backupConfig, incrementBackupName, testBackupName)
uploadCmd = fmt.Sprintf("clickhouse-backup -c /etc/clickhouse-backup/%s upload %s --diff-from-remote %s --resume", backupConfig, incrementBackupName, fullBackupName)
env.checkResumeAlreadyProcessed(uploadCmd, incrementBackupName, "upload", r, remoteStorageType)

backupDir := "/var/lib/clickhouse/backup"
Expand All @@ -2414,7 +2432,7 @@ func (env *TestEnvironment) runMainIntegrationScenario(t *testing.T, remoteStora
r.NoError(err)
r.Equal(2, len(strings.Split(strings.Trim(out, " \t\r\n"), "\n")), "expect '2' backups exists in backup directory")
log.Debug("Delete backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "local", testBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "local", fullBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "local", incrementBackupName)
out, err = env.DockerExecOut("clickhouse-backup", "bash", "-ce", "ls -lha "+backupDir+" | grep "+t.Name())
r.NotNil(err)
Expand All @@ -2424,17 +2442,17 @@ func (env *TestEnvironment) runMainIntegrationScenario(t *testing.T, remoteStora

log.Debug("Download")
replaceStorageDiskNameForReBalance(r, env, remoteStorageType, false)
downloadCmd := fmt.Sprintf("clickhouse-backup -c /etc/clickhouse-backup/%s download --resume %s", backupConfig, testBackupName)
env.checkResumeAlreadyProcessed(downloadCmd, testBackupName, "download", r, remoteStorageType)
downloadCmd := fmt.Sprintf("clickhouse-backup -c /etc/clickhouse-backup/%s download --resume %s", backupConfig, fullBackupName)
env.checkResumeAlreadyProcessed(downloadCmd, fullBackupName, "download", r, remoteStorageType)

log.Debug("Restore schema")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "restore", "--schema", testBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "restore", "--schema", fullBackupName)

log.Debug("Restore data")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "restore", "--data", testBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "restore", "--data", fullBackupName)

log.Debug("Full restore with rm")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "restore", "--rm", testBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "restore", "--rm", fullBackupName)

log.Debug("Check data")
for i := range testData {
Expand All @@ -2451,7 +2469,7 @@ func (env *TestEnvironment) runMainIntegrationScenario(t *testing.T, remoteStora
dropDatabasesFromTestDataDataSet(t, r, env, databaseList)

log.Debug("Delete backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "local", testBackupName)
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/"+backupConfig, "delete", "local", fullBackupName)

log.Debug("Download increment")
downloadCmd = fmt.Sprintf("clickhouse-backup -c /etc/clickhouse-backup/%s download --resume %s", backupConfig, incrementBackupName)
Expand Down Expand Up @@ -2480,9 +2498,9 @@ func (env *TestEnvironment) runMainIntegrationScenario(t *testing.T, remoteStora

// test end
log.Debug("Clean after finish")
// during download increment, partially downloaded full will clean
// during download increment, partially downloaded full will also clean
fullCleanup(t, r, env, []string{incrementBackupName}, []string{"local"}, nil, true, false, backupConfig)
fullCleanup(t, r, env, []string{testBackupName, incrementBackupName}, []string{"remote"}, databaseList, true, true, backupConfig)
fullCleanup(t, r, env, []string{fullBackupName, incrementBackupName}, []string{"remote"}, databaseList, true, true, backupConfig)
replaceStorageDiskNameForReBalance(r, env, remoteStorageType, true)
env.checkObjectStorageIsEmpty(t, r, remoteStorageType)
}
Expand Down
Loading

0 comments on commit 0e085c1

Please sign in to comment.