Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-3.6] Backport DowngradeInfo proto change #19471

Merged
merged 5 commits into from
Feb 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2135,6 +2135,19 @@
}
}
},
"etcdserverpbDowngradeInfo": {
"type": "object",
"properties": {
"enabled": {
"type": "boolean",
"description": "enabled indicates whether the cluster is enabled to downgrade."
},
"targetVersion": {
"type": "string",
"description": "targetVersion is the target downgrade version."
}
}
},
"etcdserverpbDowngradeRequest": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2840,6 +2853,10 @@
"type": "string",
"format": "int64",
"title": "dbSizeQuota is the configured etcd storage quota in bytes (the value passed to etcd instance by flag --quota-backend-bytes)"
},
"downgradeInfo": {
"$ref": "#/definitions/etcdserverpbDowngradeInfo",
"description": "downgradeInfo indicates if there is downgrade process."
}
}
},
Expand Down
933 changes: 610 additions & 323 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,15 @@ message StatusResponse {
string storageVersion = 11 [(versionpb.etcd_version_field)="3.6"];
// dbSizeQuota is the configured etcd storage quota in bytes (the value passed to etcd instance by flag --quota-backend-bytes)
int64 dbSizeQuota = 12 [(versionpb.etcd_version_field)="3.6"];
// downgradeInfo indicates if there is downgrade process.
DowngradeInfo downgradeInfo = 13 [(versionpb.etcd_version_field)="3.6"];
}

message DowngradeInfo {
// enabled indicates whether the cluster is enabled to downgrade.
bool enabled = 1;
// targetVersion is the target downgrade version.
string targetVersion = 2;
}

message AuthEnableRequest {
Expand Down
5 changes: 4 additions & 1 deletion etcdctl/ctlv3/command/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -220,7 +221,7 @@
func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]string) {
hdr = []string{
"endpoint", "ID", "version", "storage version", "db size", "in use", "percentage not in use", "quota", "is leader", "is learner", "raft term",
"raft index", "raft applied index", "errors",
"raft index", "raft applied index", "errors", "downgrade target version", "downgrade enabled",

Check warning on line 224 in etcdctl/ctlv3/command/printer.go

View check run for this annotation

Codecov / codecov/patch

etcdctl/ctlv3/command/printer.go#L224

Added line #L224 was not covered by tests
}
for _, status := range statusList {
rows = append(rows, []string{
Expand All @@ -238,6 +239,8 @@
fmt.Sprint(status.Resp.RaftIndex),
fmt.Sprint(status.Resp.RaftAppliedIndex),
fmt.Sprint(strings.Join(status.Resp.Errors, ", ")),
status.Resp.DowngradeInfo.GetTargetVersion(),
strconv.FormatBool(status.Resp.DowngradeInfo.GetEnabled()),

Check warning on line 243 in etcdctl/ctlv3/command/printer.go

View check run for this annotation

Codecov / codecov/patch

etcdctl/ctlv3/command/printer.go#L242-L243

Added lines #L242 - L243 were not covered by tests
})
}
return hdr, rows
Expand Down
2 changes: 2 additions & 0 deletions etcdctl/ctlv3/command/printer_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@
fmt.Println(`"RaftAppliedIndex" :`, ep.Resp.RaftAppliedIndex)
fmt.Println(`"Errors" :`, ep.Resp.Errors)
fmt.Printf("\"Endpoint\" : %q\n", ep.Ep)
fmt.Printf("\"DowngradeTargetVersion\" : %q\n", ep.Resp.DowngradeInfo.GetTargetVersion())
fmt.Println(`"DowngradeEnabled" :`, ep.Resp.DowngradeInfo.GetEnabled())

Check warning on line 207 in etcdctl/ctlv3/command/printer_fields.go

View check run for this annotation

Codecov / codecov/patch

etcdctl/ctlv3/command/printer_fields.go#L206-L207

Added lines #L206 - L207 were not covered by tests
fmt.Println()
}
}
Expand Down
4 changes: 4 additions & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ etcdserverpb.DeleteRangeResponse: "3.0"
etcdserverpb.DeleteRangeResponse.deleted: ""
etcdserverpb.DeleteRangeResponse.header: ""
etcdserverpb.DeleteRangeResponse.prev_kvs: "3.1"
etcdserverpb.DowngradeInfo: ""
etcdserverpb.DowngradeInfo.enabled: ""
etcdserverpb.DowngradeInfo.targetVersion: ""
etcdserverpb.DowngradeRequest: "3.5"
etcdserverpb.DowngradeRequest.CANCEL: ""
etcdserverpb.DowngradeRequest.DowngradeAction: "3.5"
Expand Down Expand Up @@ -382,6 +385,7 @@ etcdserverpb.StatusResponse: "3.0"
etcdserverpb.StatusResponse.dbSize: ""
etcdserverpb.StatusResponse.dbSizeInUse: "3.4"
etcdserverpb.StatusResponse.dbSizeQuota: "3.6"
etcdserverpb.StatusResponse.downgradeInfo: "3.6"
etcdserverpb.StatusResponse.errors: "3.4"
etcdserverpb.StatusResponse.header: ""
etcdserverpb.StatusResponse.isLearner: "3.4"
Expand Down
7 changes: 7 additions & 0 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,17 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
DbSizeInUse: ms.bg.Backend().SizeInUse(),
IsLearner: ms.cs.IsLearner(),
DbSizeQuota: ms.cg.Config().QuotaBackendBytes,
DowngradeInfo: &pb.DowngradeInfo{Enabled: false},
}
if storageVersion := ms.vs.GetStorageVersion(); storageVersion != nil {
resp.StorageVersion = storageVersion.String()
}
if downgradeInfo := ms.vs.GetDowngradeInfo(); downgradeInfo != nil {
resp.DowngradeInfo = &pb.DowngradeInfo{
Enabled: downgradeInfo.Enabled,
TargetVersion: downgradeInfo.TargetVersion,
}
}
if resp.Leader == raft.None {
resp.Errors = append(resp.Errors, errors.ErrNoLeader.Error())
}
Expand Down
48 changes: 24 additions & 24 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
Expand All @@ -51,6 +52,10 @@ func TestDowngradeUpgradeClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, 1, false, noCancellation)
}

func TestDowngradeUpgrade2InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 2, 3, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, 3, false, noCancellation)
}
Expand Down Expand Up @@ -128,6 +133,9 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
time.Sleep(etcdserver.HealthInterval)
}

t.Log("Downgrade should be disabled")
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: false})

t.Log("Adding member to test membership, but a learner avoid breaking quorum")
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
require.NoError(t, err)
Expand All @@ -150,6 +158,10 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
return // No need to perform downgrading, end the test here
}
e2e.DowngradeEnable(t, epc, lastVersion)

t.Log("Downgrade should be enabled")
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})

if triggerCancellation == cancelRightAfterEnable {
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
e2e.DowngradeCancel(t, epc)
Expand All @@ -162,10 +174,10 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
t.Logf("Elect members for operations on members: %v", membersToChange)

t.Logf("Starting downgrade process to %q", lastVersionStr)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, currentVersion, lastClusterVersion)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, true, currentVersion, lastClusterVersion)
require.NoError(t, err)
if len(membersToChange) == len(epc.Procs) {
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
e2e.AssertProcessLogs(t, epc.Procs[epc.WaitLeader(t)], "the cluster has been downgraded")
}

t.Log("Downgrade complete")
Expand Down Expand Up @@ -198,10 +210,19 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
beforeMembers, beforeKV = getMembersAndKeys(t, cc)

t.Logf("Starting upgrade process to %q", currentVersionStr)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, lastClusterVersion, currentVersion)
downgradeEnabled := triggerCancellation == noCancellation && numberOfMembersToDowngrade < clusterSize
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, downgradeEnabled, lastClusterVersion, currentVersion)
require.NoError(t, err)
t.Log("Upgrade complete")

if downgradeEnabled {
t.Log("Downgrade should be still enabled")
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})
} else {
t.Log("Downgrade should be disabled")
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: false})
}

afterMembers, afterKV = getMembersAndKeys(t, cc)
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
assert.Equal(t, beforeMembers.Members, afterMembers.Members)
Expand All @@ -224,27 +245,6 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
return epc
}

func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
for i := 0; i < len(epc.Procs); i++ {
endpoints := epc.Procs[i].EndpointsGRPC()
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
defer cli.Close()
resp, err := cli.Status(ctx, endpoints[0])
require.NoError(t, err)
if resp.Header.GetMemberId() == resp.Leader {
return epc.Procs[i]
}
}
t.Fatal("Leader not found")
return nil
}

func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
16 changes: 16 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"testing"
"time"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -711,6 +712,21 @@ func (cfg *EtcdProcessClusterConfig) binaryPath(i int) string {
return execPath
}

func (epc *EtcdProcessCluster) MinServerVersion() (*semver.Version, error) {
var minVersion *semver.Version
for _, member := range epc.Procs {
ver, err := GetVersionFromBinary(member.Config().ExecPath)
if err != nil {
return nil, fmt.Errorf("failed to get version from member %s binary: %w", member.Config().Name, err)
}

if minVersion == nil || ver.LessThan(*minVersion) {
minVersion = ver
}
}
return minVersion, nil
}

func values(cfg embed.Config) map[string]string {
fs := flag.NewFlagSet("etcd", flag.ContinueOnError)
cfg.AddFlags(fs)
Expand Down
79 changes: 73 additions & 6 deletions tests/framework/e2e/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

Expand All @@ -46,7 +48,6 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
Server: OffsetMinor(ver, 1).String(),
Storage: ver.String(),
})
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
}

t.Log("Cluster is ready for downgrade")
Expand Down Expand Up @@ -82,14 +83,59 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
t.Log("Cluster downgrade cancellation is completed")
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
func ValidateDowngradeInfo(t *testing.T, clus *EtcdProcessCluster, expected *pb.DowngradeInfo) {
cfg := clus.Cfg

for i := 0; i < len(clus.Procs); i++ {
member := clus.Procs[i]
mc := member.Etcdctl()
mName := member.Config().Name

testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
for {
statuses, err := mc.Status(context.Background())
if err != nil {
cfg.Logger.Warn("failed to get member status and retrying",
zap.Error(err),
zap.String("member", mName))

time.Sleep(time.Second)
continue
}

require.Lenf(t, statuses, 1, "member %s", mName)
got := (*pb.StatusResponse)(statuses[0]).GetDowngradeInfo()

if got.GetEnabled() == expected.GetEnabled() && got.GetTargetVersion() == expected.GetTargetVersion() {
cfg.Logger.Info("DowngradeInfo match", zap.String("member", mName))
break
}

cfg.Logger.Warn("DowngradeInfo didn't match retrying",
zap.String("member", mName),
zap.Dict("expected",
zap.Bool("Enabled", expected.GetEnabled()),
zap.String("TargetVersion", expected.GetTargetVersion()),
),
zap.Dict("got",
zap.Bool("Enabled", got.GetEnabled()),
zap.String("TargetVersion", got.GetTargetVersion()),
),
)
time.Sleep(time.Second)
}
})
}
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, downgradeEnabled bool, currentVersion, targetVersion *semver.Version) error {
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
t.Logf("Elect members for operations on members: %v", membersToChange)

return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, currentVersion, targetVersion)
return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, downgradeEnabled, currentVersion, targetVersion)
}

func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, currentVersion, targetVersion *semver.Version) error {
func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, downgradeEnabled bool, currentVersion, targetVersion *semver.Version) error {
if lg == nil {
lg = clus.lg
}
Expand All @@ -100,7 +146,6 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
opString = "downgrading"
newExecPath = BinPath.EtcdLastRelease
}

for _, memberID := range membersToChange {
member := clus.Procs[memberID]
if member.Config().ExecPath == newExecPath {
Expand All @@ -117,11 +162,33 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
return err
}
}

t.Log("Waiting health interval to make sure the leader propagates version to new processes")
time.Sleep(etcdserver.HealthInterval)

lg.Info("Validating versions")
clusterVersion := targetVersion
if !isDowngrade {
if downgradeEnabled {
// If the downgrade isn't cancelled yet, then the cluster
// version will always stay at the lower version, no matter
// what's the binary version of each member.
clusterVersion = currentVersion
} else {
// If the downgrade has already been cancelled, then the
// cluster version is the minimal server version.
minVer, err := clus.MinServerVersion()
if err != nil {
return fmt.Errorf("failed to get min server version: %w", err)
}
clusterVersion = minVer
}
}

for _, memberID := range membersToChange {
member := clus.Procs[memberID]
ValidateVersion(t, clus.Cfg, member, version.Versions{
Cluster: targetVersion.String(),
Cluster: clusterVersion.String(),
Server: targetVersion.String(),
})
}
Expand Down
Loading