diff --git a/.ci/integration_test b/.ci/integration_test index aeb731d1d..6879b3603 100755 --- a/.ci/integration_test +++ b/.ci/integration_test @@ -77,7 +77,7 @@ function setup_ginkgo() { function setup_etcd(){ echo "Downloading and installing etcd..." - export ETCD_VER=v3.3.8 + export ETCD_VER=v3.4.13 if [[ $(uname) == 'Darwin' ]]; then curl -L https://storage.googleapis.com/etcd/${ETCD_VER}/etcd-${ETCD_VER}-darwin-amd64.zip -o etcd-${ETCD_VER}-darwin-amd64.zip unzip etcd-${ETCD_VER}-darwin-amd64.zip @@ -196,6 +196,11 @@ function cleanup-aws-infrastructure() { echo "AWS infrastructure cleanup completed." } +function remove-etcd-data-directory() { + echo "Removing ETCD Data Directory" + rm -rf ${ETCD_DATA_DIR} + } + ############################# # Azure Setup # ############################# @@ -229,6 +234,7 @@ function setup_test_cluster() { function cleanup_test_environment() { cleanup-aws-infrastructure + remove-etcd-data-directory } ############################################################################### diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index 546e687f9..f9ac4f65e 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -93,8 +93,9 @@ func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.Sn EmbeddedEtcdQuotaBytes: options.Config.EmbeddedEtcdQuotaBytes, SnapstoreConfig: snapstoreConfig, }, - Logger: logger, - ZapLogger: zapLogger, + OriginalClusterSize: options.OriginalClusterSize, + Logger: logger, + ZapLogger: zapLogger, }, Logger: logger, } diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index f673dbfb9..e13463698 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -141,6 +141,11 @@ func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, err return DataDirectoryValid, nil } + if d.OriginalClusterSize > 1 { + d.Logger.Info("Skipping check for revision consistency of etcd member as it will get in sync with etcd leader.") + return DataDirectoryValid, nil + } + etcdRevision, err := getLatestEtcdRevision(d.backendPath()) if err != nil && errors.Is(err, bolt.ErrTimeout) { d.Logger.Errorf("another etcd process is using %v and holds the file lock", d.backendPath()) @@ -154,7 +159,7 @@ func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, err etcdRevisionStatus, latestSnapshotRevision, err := d.checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision) // if etcd revision is inconsistent with latest snapshot revision then - // check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB. + // check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB if etcdRevisionStatus == RevisionConsistencyError { d.Logger.Info("Checking for Full revision consistency...") fullRevisionConsistencyStatus, err := d.checkFullRevisionConsistency(dataDir, latestSnapshotRevision) diff --git a/pkg/initializer/validator/types.go b/pkg/initializer/validator/types.go index 9f34ea367..2fad84f7e 100644 --- a/pkg/initializer/validator/types.go +++ b/pkg/initializer/validator/types.go @@ -74,9 +74,10 @@ type Config struct { // DataValidator contains implements Validator interface to perform data validation. type DataValidator struct { - Config *Config - Logger *logrus.Logger - ZapLogger *zap.Logger + Config *Config + OriginalClusterSize int + Logger *logrus.Logger + ZapLogger *zap.Logger } // Validator is the interface for data validation actions. diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index 8183f42e2..69f807762 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -19,6 +19,7 @@ import ( "fmt" "net" "net/http" + "os" "sync" "sync/atomic" "time" @@ -28,6 +29,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/leaderelection" "github.com/gardener/etcd-backup-restore/pkg/metrics" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + "github.com/ghodss/yaml" "github.com/gardener/etcd-backup-restore/pkg/defragmentor" "github.com/gardener/etcd-backup-restore/pkg/errors" @@ -56,7 +58,8 @@ type BackupRestoreServer struct { var ( // runServerWithSnapshotter indicates whether to start server with or without snapshotter. - runServerWithSnapshotter bool = true + runServerWithSnapshotter bool = true + etcdConfig string = "/var/etcd/config/etcd.conf.yaml" ) // NewBackupRestoreServer return new backup restore server. @@ -88,6 +91,35 @@ func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponen // Run starts the backup restore server. func (b *BackupRestoreServer) Run(ctx context.Context) error { + var inputFileName string + var err error + + // (For testing purpose) If no ETCD_CONF variable set as environment variable, then consider backup-restore server is not used for tests. + // For tests or to run backup-restore server as standalone, user needs to set ETCD_CONF variable with proper location of ETCD config yaml + etcdConfigForTest := os.Getenv("ETCD_CONF") + if etcdConfigForTest != "" { + inputFileName = etcdConfigForTest + } else { + inputFileName = etcdConfig + } + + configYML, err := os.ReadFile(inputFileName) + if err != nil { + b.logger.Fatalf("Unable to read etcd config file: %v", err) + return err + } + + config := map[string]interface{}{} + if err := yaml.Unmarshal([]byte(configYML), &config); err != nil { + b.logger.Fatalf("Unable to unmarshal etcd config yaml file: %v", err) + return err + } + + initialClusterMap, err := types.NewURLsMap(fmt.Sprint(config["initial-cluster"])) + if err != nil { + b.logger.Fatal("Please provide initial cluster value for embedded ETCD") + } + clusterURLsMap, err := types.NewURLsMap(b.config.RestorationConfig.InitialCluster) if err != nil { // Ideally this case should not occur, since this check is done at the config validations. @@ -101,9 +133,10 @@ func (b *BackupRestoreServer) Run(ctx context.Context) error { } options := &brtypes.RestoreOptions{ - Config: b.config.RestorationConfig, - ClusterURLs: clusterURLsMap, - PeerURLs: peerURLs, + Config: b.config.RestorationConfig, + ClusterURLs: clusterURLsMap, + OriginalClusterSize: len(initialClusterMap), + PeerURLs: peerURLs, } if b.config.SnapstoreConfig == nil || len(b.config.SnapstoreConfig.Provider) == 0 { diff --git a/pkg/types/restorer.go b/pkg/types/restorer.go index 259cfa2ed..e1baab9d8 100644 --- a/pkg/types/restorer.go +++ b/pkg/types/restorer.go @@ -47,7 +47,9 @@ type NewClientFactoryFunc func(cfg EtcdConnectionConfig) client.Factory type RestoreOptions struct { Config *RestorationConfig ClusterURLs types.URLsMap - PeerURLs types.URLs + // OriginalClusterSize indicates the actual cluster size from the ETCD config + OriginalClusterSize int + PeerURLs types.URLs // Base full snapshot + delta snapshots to restore from BaseSnapshot *Snapshot DeltaSnapList SnapList diff --git a/test/e2e/integration/cloud_backup_test.go b/test/e2e/integration/cloud_backup_test.go index e0f921a21..7f43bee75 100644 --- a/test/e2e/integration/cloud_backup_test.go +++ b/test/e2e/integration/cloud_backup_test.go @@ -37,13 +37,14 @@ import ( func startEtcd() (*Cmd, *chan error) { errChan := make(chan error) logger := logrus.New() - etcdArgs := []string{"--name=etcd", + etcdArgs := []string{"--name=etcd1", "--advertise-client-urls=http://0.0.0.0:2379", "--listen-client-urls=http://0.0.0.0:2379", + "--initial-cluster=etcd1=http://0.0.0.0:2380", + "--initial-advertise-peer-urls=http://0.0.0.0:2380", "--initial-cluster-state=new", "--initial-cluster-token=new", - "--log-output=stdout", - "--data-dir=" + os.Getenv("ETCD_DATA_DIR")} + "--log-outputs=stdout"} cmdEtcd := &Cmd{ Task: "etcd", Flags: etcdArgs, @@ -68,6 +69,7 @@ func startSnapshotter() (*Cmd, *chan error) { "--garbage-collection-period=30s", "--schedule=*/1 * * * *", "--storage-provider=S3", + "--endpoints=http://0.0.0.0:2379", "--store-container=" + os.Getenv("TEST_ID"), } logger.Info(etcdbrctlArgs) @@ -127,6 +129,29 @@ var _ = Describe("CloudBackup", func() { Container: os.Getenv("TEST_ID"), Prefix: path.Join("v2"), } + // Create and place a ETCD config yaml + outfile := "/tmp/etcd.conf.yaml" + etcdConfigYaml := `# Human-readable name for this member. + name: etcd1 + data-dir: ` + os.Getenv("ETCD_DATA_DIR") + ` + metrics: extensive + snapshot-count: 75000 + enable-v2: false + quota-backend-bytes: 1073741824 + listen-client-urls: http://0.0.0.0:2379 + advertise-client-urls: http://0.0.0.0:2379 + initial-advertise-peer-urls: http://0.0.0.0:2380 + initial-cluster: etcd1=http://0.0.0.0:2380 + initial-cluster-token: new + initial-cluster-state: new + auto-compaction-mode: periodic + auto-compaction-retention: 30m` + + err := os.WriteFile(outfile, []byte(etcdConfigYaml), 0755) + Expect(err).ShouldNot(HaveOccurred()) + os.Setenv("ETCD_CONF", outfile) + // Required as the config file for embedded ETCD fetches ETCD instance name from the POD_NAME variable + os.Setenv("POD_NAME", "etcd1") }) Describe("Regular backups", func() { @@ -241,9 +266,9 @@ var _ = Describe("CloudBackup", func() { dbFilePath := filepath.Join(dataDir, "member", "snap", "db") logger.Infof("db file: %v", dbFilePath) file, err := os.Create(dbFilePath) + Expect(err).ShouldNot(HaveOccurred()) defer file.Close() fileWriter := bufio.NewWriter(file) - Expect(err).ShouldNot(HaveOccurred()) fileWriter.Write([]byte("corrupt file..")) fileWriter.Flush() zapLogger, _ := zap.NewProduction() @@ -354,19 +379,19 @@ var _ = Describe("CloudBackup", func() { Expect(status).Should(Equal("New")) Expect(err).ShouldNot(HaveOccurred()) time.Sleep(10 * time.Second) - // Stop etcd. - cmdEtcd.StopProcess() - time.Sleep(10 * time.Second) // Corrupt directory testDataCorruptionRestoration := func() { + // Stop etcd. + cmdEtcd.StopProcess() + time.Sleep(10 * time.Second) dataDir := os.Getenv("ETCD_DATA_DIR") dbFilePath := filepath.Join(dataDir, "member", "snap", "db") logger.Infof("db file: %v", dbFilePath) file, err := os.Create(dbFilePath) + Expect(err).ShouldNot(HaveOccurred()) defer file.Close() fileWriter := bufio.NewWriter(file) - Expect(err).ShouldNot(HaveOccurred()) fileWriter.Write([]byte("corrupt file..")) fileWriter.Flush() status, err = getEtcdBrServerStatus()