Skip to content

Commit

Permalink
Merge pull request #482 from abdasgupta/single_restore
Browse files Browse the repository at this point in the history
Added logic for single node restoration after abrupt shutdown.
  • Loading branch information
abdasgupta authored Jun 6, 2022
2 parents 82d1d17 + d79a491 commit 3f475fe
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 20 deletions.
8 changes: 7 additions & 1 deletion .ci/integration_test
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function setup_ginkgo() {

function setup_etcd(){
echo "Downloading and installing etcd..."
export ETCD_VER=v3.3.8
export ETCD_VER=v3.4.13
if [[ $(uname) == 'Darwin' ]]; then
curl -L https://storage.googleapis.com/etcd/${ETCD_VER}/etcd-${ETCD_VER}-darwin-amd64.zip -o etcd-${ETCD_VER}-darwin-amd64.zip
unzip etcd-${ETCD_VER}-darwin-amd64.zip
Expand Down Expand Up @@ -196,6 +196,11 @@ function cleanup-aws-infrastructure() {
echo "AWS infrastructure cleanup completed."
}

function remove-etcd-data-directory() {
echo "Removing ETCD Data Directory"
rm -rf ${ETCD_DATA_DIR}
}

#############################
# Azure Setup #
#############################
Expand Down Expand Up @@ -229,6 +234,7 @@ function setup_test_cluster() {

function cleanup_test_environment() {
cleanup-aws-infrastructure
remove-etcd-data-directory
}

###############################################################################
Expand Down
5 changes: 3 additions & 2 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.Sn
EmbeddedEtcdQuotaBytes: options.Config.EmbeddedEtcdQuotaBytes,
SnapstoreConfig: snapstoreConfig,
},
Logger: logger,
ZapLogger: zapLogger,
OriginalClusterSize: options.OriginalClusterSize,
Logger: logger,
ZapLogger: zapLogger,
},
Logger: logger,
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, err
return DataDirectoryValid, nil
}

if d.OriginalClusterSize > 1 {
d.Logger.Info("Skipping check for revision consistency of etcd member as it will get in sync with etcd leader.")
return DataDirectoryValid, nil
}

etcdRevision, err := getLatestEtcdRevision(d.backendPath())
if err != nil && errors.Is(err, bolt.ErrTimeout) {
d.Logger.Errorf("another etcd process is using %v and holds the file lock", d.backendPath())
Expand All @@ -154,7 +159,7 @@ func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, err
etcdRevisionStatus, latestSnapshotRevision, err := d.checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision)

// if etcd revision is inconsistent with latest snapshot revision then
// check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB.
// check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB
if etcdRevisionStatus == RevisionConsistencyError {
d.Logger.Info("Checking for Full revision consistency...")
fullRevisionConsistencyStatus, err := d.checkFullRevisionConsistency(dataDir, latestSnapshotRevision)
Expand Down
7 changes: 4 additions & 3 deletions pkg/initializer/validator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type Config struct {

// DataValidator contains implements Validator interface to perform data validation.
type DataValidator struct {
Config *Config
Logger *logrus.Logger
ZapLogger *zap.Logger
Config *Config
OriginalClusterSize int
Logger *logrus.Logger
ZapLogger *zap.Logger
}

// Validator is the interface for data validation actions.
Expand Down
41 changes: 37 additions & 4 deletions pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/gardener/etcd-backup-restore/pkg/leaderelection"
"github.com/gardener/etcd-backup-restore/pkg/metrics"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/ghodss/yaml"

"github.com/gardener/etcd-backup-restore/pkg/defragmentor"
"github.com/gardener/etcd-backup-restore/pkg/errors"
Expand Down Expand Up @@ -56,7 +58,8 @@ type BackupRestoreServer struct {

var (
// runServerWithSnapshotter indicates whether to start server with or without snapshotter.
runServerWithSnapshotter bool = true
runServerWithSnapshotter bool = true
etcdConfig string = "/var/etcd/config/etcd.conf.yaml"
)

// NewBackupRestoreServer return new backup restore server.
Expand Down Expand Up @@ -88,6 +91,35 @@ func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponen

// Run starts the backup restore server.
func (b *BackupRestoreServer) Run(ctx context.Context) error {
var inputFileName string
var err error

// (For testing purpose) If no ETCD_CONF variable set as environment variable, then consider backup-restore server is not used for tests.
// For tests or to run backup-restore server as standalone, user needs to set ETCD_CONF variable with proper location of ETCD config yaml
etcdConfigForTest := os.Getenv("ETCD_CONF")
if etcdConfigForTest != "" {
inputFileName = etcdConfigForTest
} else {
inputFileName = etcdConfig
}

configYML, err := os.ReadFile(inputFileName)
if err != nil {
b.logger.Fatalf("Unable to read etcd config file: %v", err)
return err
}

config := map[string]interface{}{}
if err := yaml.Unmarshal([]byte(configYML), &config); err != nil {
b.logger.Fatalf("Unable to unmarshal etcd config yaml file: %v", err)
return err
}

initialClusterMap, err := types.NewURLsMap(fmt.Sprint(config["initial-cluster"]))
if err != nil {
b.logger.Fatal("Please provide initial cluster value for embedded ETCD")
}

clusterURLsMap, err := types.NewURLsMap(b.config.RestorationConfig.InitialCluster)
if err != nil {
// Ideally this case should not occur, since this check is done at the config validations.
Expand All @@ -101,9 +133,10 @@ func (b *BackupRestoreServer) Run(ctx context.Context) error {
}

options := &brtypes.RestoreOptions{
Config: b.config.RestorationConfig,
ClusterURLs: clusterURLsMap,
PeerURLs: peerURLs,
Config: b.config.RestorationConfig,
ClusterURLs: clusterURLsMap,
OriginalClusterSize: len(initialClusterMap),
PeerURLs: peerURLs,
}

if b.config.SnapstoreConfig == nil || len(b.config.SnapstoreConfig.Provider) == 0 {
Expand Down
4 changes: 3 additions & 1 deletion pkg/types/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ type NewClientFactoryFunc func(cfg EtcdConnectionConfig) client.Factory
type RestoreOptions struct {
Config *RestorationConfig
ClusterURLs types.URLsMap
PeerURLs types.URLs
// OriginalClusterSize indicates the actual cluster size from the ETCD config
OriginalClusterSize int
PeerURLs types.URLs
// Base full snapshot + delta snapshots to restore from
BaseSnapshot *Snapshot
DeltaSnapList SnapList
Expand Down
41 changes: 33 additions & 8 deletions test/e2e/integration/cloud_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import (
func startEtcd() (*Cmd, *chan error) {
errChan := make(chan error)
logger := logrus.New()
etcdArgs := []string{"--name=etcd",
etcdArgs := []string{"--name=etcd1",
"--advertise-client-urls=http://0.0.0.0:2379",
"--listen-client-urls=http://0.0.0.0:2379",
"--initial-cluster=etcd1=http://0.0.0.0:2380",
"--initial-advertise-peer-urls=http://0.0.0.0:2380",
"--initial-cluster-state=new",
"--initial-cluster-token=new",
"--log-output=stdout",
"--data-dir=" + os.Getenv("ETCD_DATA_DIR")}
"--log-outputs=stdout"}
cmdEtcd := &Cmd{
Task: "etcd",
Flags: etcdArgs,
Expand All @@ -68,6 +69,7 @@ func startSnapshotter() (*Cmd, *chan error) {
"--garbage-collection-period=30s",
"--schedule=*/1 * * * *",
"--storage-provider=S3",
"--endpoints=http://0.0.0.0:2379",
"--store-container=" + os.Getenv("TEST_ID"),
}
logger.Info(etcdbrctlArgs)
Expand Down Expand Up @@ -127,6 +129,29 @@ var _ = Describe("CloudBackup", func() {
Container: os.Getenv("TEST_ID"),
Prefix: path.Join("v2"),
}
// Create and place a ETCD config yaml
outfile := "/tmp/etcd.conf.yaml"
etcdConfigYaml := `# Human-readable name for this member.
name: etcd1
data-dir: ` + os.Getenv("ETCD_DATA_DIR") + `
metrics: extensive
snapshot-count: 75000
enable-v2: false
quota-backend-bytes: 1073741824
listen-client-urls: http://0.0.0.0:2379
advertise-client-urls: http://0.0.0.0:2379
initial-advertise-peer-urls: http://0.0.0.0:2380
initial-cluster: etcd1=http://0.0.0.0:2380
initial-cluster-token: new
initial-cluster-state: new
auto-compaction-mode: periodic
auto-compaction-retention: 30m`

err := os.WriteFile(outfile, []byte(etcdConfigYaml), 0755)
Expect(err).ShouldNot(HaveOccurred())
os.Setenv("ETCD_CONF", outfile)
// Required as the config file for embedded ETCD fetches ETCD instance name from the POD_NAME variable
os.Setenv("POD_NAME", "etcd1")
})

Describe("Regular backups", func() {
Expand Down Expand Up @@ -241,9 +266,9 @@ var _ = Describe("CloudBackup", func() {
dbFilePath := filepath.Join(dataDir, "member", "snap", "db")
logger.Infof("db file: %v", dbFilePath)
file, err := os.Create(dbFilePath)
Expect(err).ShouldNot(HaveOccurred())
defer file.Close()
fileWriter := bufio.NewWriter(file)
Expect(err).ShouldNot(HaveOccurred())
fileWriter.Write([]byte("corrupt file.."))
fileWriter.Flush()
zapLogger, _ := zap.NewProduction()
Expand Down Expand Up @@ -354,19 +379,19 @@ var _ = Describe("CloudBackup", func() {
Expect(status).Should(Equal("New"))
Expect(err).ShouldNot(HaveOccurred())
time.Sleep(10 * time.Second)
// Stop etcd.
cmdEtcd.StopProcess()
time.Sleep(10 * time.Second)

// Corrupt directory
testDataCorruptionRestoration := func() {
// Stop etcd.
cmdEtcd.StopProcess()
time.Sleep(10 * time.Second)
dataDir := os.Getenv("ETCD_DATA_DIR")
dbFilePath := filepath.Join(dataDir, "member", "snap", "db")
logger.Infof("db file: %v", dbFilePath)
file, err := os.Create(dbFilePath)
Expect(err).ShouldNot(HaveOccurred())
defer file.Close()
fileWriter := bufio.NewWriter(file)
Expect(err).ShouldNot(HaveOccurred())
fileWriter.Write([]byte("corrupt file.."))
fileWriter.Flush()
status, err = getEtcdBrServerStatus()
Expand Down

0 comments on commit 3f475fe

Please sign in to comment.