From 0340c6096f5f0861a3c5410fba465847d0fc8f3e Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Mon, 22 Jul 2024 10:15:59 -0400 Subject: [PATCH] Revert "Merge pull request #17425 from ivanvc/release-3.5-backport-ignore-old-leaders-leases-revoking-request" This reverts commit 9ffba74e6688f92f628f68a1720781ed5be128dd, reversing changes made to c21317739433fabb754a9b4630a24b8104ff89b0. --- server/etcdserver/server.go | 44 +------- server/etcdserver/v3_server.go | 11 -- server/lease/lessor.go | 10 +- server/lease/lessor_test.go | 6 +- tests/e2e/v3_lease_no_proxy_test.go | 157 ---------------------------- tests/framework/e2e/cluster.go | 8 -- tests/integration/v3_lease_test.go | 2 +- 7 files changed, 7 insertions(+), 231 deletions(-) delete mode 100644 tests/e2e/v3_lease_no_proxy_test.go diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index aca92dd8e49..0095b6ec5ca 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -669,15 +669,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if srv.Cfg.EnableLeaseCheckpoint { // setting checkpointer enables lease checkpoint feature. - srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { - if !srv.ensureLeadership() { - srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader", - zap.Uint64("local-member-id", uint64(srv.ID()))) - return lease.ErrNotPrimary - } - + srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) - return nil }) } @@ -1161,19 +1154,7 @@ func (s *EtcdServer) run() { func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { s.GoAttach(func() { - // We shouldn't revoke any leases if current member isn't a leader, - // because the operation should only be performed by the leader. When - // the leader gets blocked on the raft loop, such as writing WAL entries, - // it can't process any events or messages from raft. It may think it - // is still the leader even the leader has already changed. - // Refer to https://github.com/etcd-io/etcd/issues/15247 lg := s.Logger() - if !s.ensureLeadership() { - lg.Warn("Ignore the lease revoking request because current member isn't a leader", - zap.Uint64("local-member-id", uint64(s.ID()))) - return - } - // Increases throughput of expired leases deletion process through parallelization c := make(chan struct{}, maxPendingRevokes) for _, curLease := range leases { @@ -1206,29 +1187,6 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { }) } -// ensureLeadership checks whether current member is still the leader. -func (s *EtcdServer) ensureLeadership() bool { - lg := s.Logger() - - ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) - defer cancel() - if err := s.linearizableReadNotify(ctx); err != nil { - lg.Warn("Failed to check current member's leadership", - zap.Error(err)) - return false - } - - newLeaderId := s.raftStatus().Lead - if newLeaderId != uint64(s.ID()) { - lg.Warn("Current member isn't a leader", - zap.Uint64("local-member-id", uint64(s.ID())), - zap.Uint64("new-lead", newLeaderId)) - return false - } - - return true -} - // Cleanup removes allocated objects by EtcdServer.NewServer in // situation that EtcdServer::Start was not called (that takes care of cleanup). func (s *EtcdServer) Cleanup() { diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index b6e7a806797..9f69b86b9b1 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -297,17 +297,6 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { if s.isLeader() { - // If s.isLeader() returns true, but we fail to ensure the current - // member's leadership, there are a couple of possibilities: - // 1. current member gets stuck on writing WAL entries; - // 2. current member is in network isolation status; - // 3. current member isn't a leader anymore (possibly due to #1 above). - // In such case, we just return error to client, so that the client can - // switch to another member to continue the lease keep-alive operation. - if !s.ensureLeadership() { - return -1, lease.ErrNotPrimary - } - if err := s.waitAppliedIndex(); err != nil { return 0, err } diff --git a/server/lease/lessor.go b/server/lease/lessor.go index abeeb09bf40..ff9cb2ca5e6 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -77,7 +77,7 @@ type RangeDeleter func() TxnDelete // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to // avoid circular dependency with mvcc. -type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error +type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) type LeaseID int64 @@ -423,9 +423,7 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { // By applying a RAFT entry only when the remainingTTL is already set, we limit the number // of RAFT entries written per lease to a max of 2 per checkpoint interval. if clearRemainingTTL { - if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil { - return -1, err - } + le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) } le.mu.Lock() @@ -661,9 +659,7 @@ func (le *lessor) checkpointScheduledLeases() { le.mu.Unlock() if len(cps) != 0 { - if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil { - return - } + le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) } if len(cps) < maxLeaseCheckpointBatchSize { return diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 06c2a8a9664..0edebdadde8 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -250,11 +250,10 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer os.RemoveAll(dir) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) - fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { + fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) } - return nil } defer le.Stop() // Set checkpointer @@ -541,7 +540,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) - le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error { + le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { close(checkpointedC) if len(lc.Checkpoints) != 1 { t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints)) @@ -550,7 +549,6 @@ func TestLessorCheckpointScheduling(t *testing.T) { if c.Remaining_TTL != 1 { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } - return nil }) _, err := le.Grant(1, 2) if err != nil { diff --git a/tests/e2e/v3_lease_no_proxy_test.go b/tests/e2e/v3_lease_no_proxy_test.go deleted file mode 100644 index eb0a5465703..00000000000 --- a/tests/e2e/v3_lease_no_proxy_test.go +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright 2024 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !cluster_proxy - -package e2e - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/e2e" - "go.etcd.io/etcd/tests/v3/framework/testutils" -) - -// TestLeaseRevoke_IgnoreOldLeader verifies that leases shouldn't be revoked -// by old leader. -// See the case 1 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093. -func TestLeaseRevoke_IgnoreOldLeader(t *testing.T) { - testLeaseRevokeIssue(t, true) -} - -// TestLeaseRevoke_ClientSwitchToOtherMember verifies that leases shouldn't -// be revoked by new leader. -// See the case 2 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093. -func TestLeaseRevoke_ClientSwitchToOtherMember(t *testing.T) { - testLeaseRevokeIssue(t, false) -} - -func testLeaseRevokeIssue(t *testing.T, connectToOneFollower bool) { - e2e.BeforeTest(t) - - ctx := context.Background() - - t.Log("Starting a new etcd cluster") - epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ - ClusterSize: 3, - GoFailEnabled: true, - GoFailClientTimeout: 40 * time.Second, - }) - require.NoError(t, err) - defer func() { - if errC := epc.Close(); errC != nil { - t.Fatalf("error closing etcd processes (%v)", errC) - } - }() - - leaderIdx := epc.WaitLeader(t) - t.Logf("Leader index: %d", leaderIdx) - - epsForNormalOperations := epc.Procs[(leaderIdx+2)%3].EndpointsGRPC() - t.Logf("Creating a client for normal operations: %v", epsForNormalOperations) - client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second}) - require.NoError(t, err) - defer client.Close() - - var epsForLeaseKeepAlive []string - if connectToOneFollower { - epsForLeaseKeepAlive = epc.Procs[(leaderIdx+1)%3].EndpointsGRPC() - } else { - epsForLeaseKeepAlive = epc.EndpointsGRPC() - } - t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive) - clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second}) - require.NoError(t, err) - defer clientForKeepAlive.Close() - - resp, err := client.Status(ctx, epsForNormalOperations[0]) - require.NoError(t, err) - oldLeaderId := resp.Leader - - t.Log("Creating a new lease") - leaseRsp, err := client.Grant(ctx, 20) - require.NoError(t, err) - - t.Log("Starting a goroutine to keep alive the lease") - doneC := make(chan struct{}) - stopC := make(chan struct{}) - startC := make(chan struct{}, 1) - go func() { - defer close(doneC) - - respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID) - require.NoError(t, kerr) - // ensure we have received the first response from the server - <-respC - startC <- struct{}{} - - for { - select { - case <-stopC: - return - case <-respC: - } - } - }() - - t.Log("Wait for the keepAlive goroutine to get started") - <-startC - - t.Log("Trigger the failpoint to simulate stalled writing") - err = epc.Procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`) - require.NoError(t, err) - - cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId) - testutils.ExecuteUntil(cctx, t, func() { - for { - resp, err = client.Status(ctx, epsForNormalOperations[0]) - if err == nil && resp.Leader != oldLeaderId { - t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader) - return - } - time.Sleep(100 * time.Millisecond) - } - }) - cancel() - - t.Log("Writing a key/value pair") - _, err = client.Put(ctx, "foo", "bar") - require.NoError(t, err) - - t.Log("Sleeping 30 seconds") - time.Sleep(30 * time.Second) - - t.Log("Remove the failpoint 'raftBeforeSave'") - err = epc.Procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave") - require.NoError(t, err) - - // By default, etcd tries to revoke leases every 7 seconds. - t.Log("Sleeping 10 seconds") - time.Sleep(10 * time.Second) - - t.Log("Confirming the lease isn't revoked") - leases, err := client.Leases(ctx) - require.NoError(t, err) - require.Equal(t, 1, len(leases.Leases)) - - t.Log("Waiting for the keepAlive goroutine to exit") - close(stopC) - <-doneC -} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index fbe932ded2f..61828c335dd 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -484,14 +484,6 @@ func (epc *EtcdProcessCluster) EndpointsV3() []string { return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsV3() }) } -func (epc *EtcdProcessCluster) EndpointsGRPC() []string { - return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsGRPC() }) -} - -func (epc *EtcdProcessCluster) EndpointsHTTP() []string { - return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsHTTP() }) -} - func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret []string) { for _, p := range epc.Procs { ret = append(ret, f(p)...) diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index e11c6f24692..27e8621ff13 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -757,7 +757,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - expectedExp := time.Now().Add(5 * time.Second) + var expectedExp time.Time for { if err = lac.Send(lreq); err != nil { break