From a147643d4e08dca098db9d5b7126a32bf11d7696 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Thu, 14 Dec 2023 12:54:36 +0000 Subject: [PATCH] commit transaction for each configuration change Current when etcd remove a member, it just removes it from bbolt db, but doesn't update the cache. On the other hands, etcd periodically commit each bbolt transaction. When etcd processes the next conf change request, it might not have commit the previous member remove request into bbolt. Accordingly it breaks the linerizability. Signed-off-by: Benjamin Wang --- server/etcdserver/server.go | 5 +++++ server/etcdserver/server_test.go | 2 ++ tests/e2e/ctl_v3_member_no_proxy_test.go | 17 +++++++++-------- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ac5f0c7ab293..ddb6e83e39a7 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1809,6 +1809,11 @@ func (s *EtcdServer) apply( var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3) + if err != nil { + s.lg.Error("failed to apply conf change", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err)) + } else { + s.KV().Commit() + } s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 4b8a12f010e4..ca04c8da4558 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -298,6 +298,7 @@ func newServer(t *testing.T, recorder *nodeRecorder) *EtcdServer { r: *newRaftNode(raftNodeConfig{lg: lg, Node: recorder}), cluster: membership.NewCluster(lg), consistIndex: cindex.NewConsistentIndex(be), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } srv.cluster.SetBackend(schema.NewMembershipBackend(lg, be)) srv.cluster.SetStore(v2store.New()) @@ -475,6 +476,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { w: wait.New(), consistIndex: ci, beHooks: serverstorage.NewBackendHooks(lg, ci), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } // create EntryConfChange entry diff --git a/tests/e2e/ctl_v3_member_no_proxy_test.go b/tests/e2e/ctl_v3_member_no_proxy_test.go index 619ae5d136c1..e6d263b0c222 100644 --- a/tests/e2e/ctl_v3_member_no_proxy_test.go +++ b/tests/e2e/ctl_v3_member_no_proxy_test.go @@ -40,18 +40,17 @@ func TestMemberReplace(t *testing.T) { require.NoError(t, err) defer epc.Close() - memberId := rand.Int() % len(epc.Procs) - member := epc.Procs[memberId] + memberIdx := rand.Int() % len(epc.Procs) + member := epc.Procs[memberIdx] memberName := member.Config().Name var endpoints []string for i := 1; i < len(epc.Procs); i++ { - endpoints = append(endpoints, epc.Procs[(memberId+i)%len(epc.Procs)].EndpointsGRPC()...) + endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...) } cc, err := e2e.NewEtcdctl(epc.Cfg.Client, endpoints) require.NoError(t, err) - c := epc.Etcdctl() - memberID, found, err := getMemberIdByName(ctx, c, memberName) + memberID, found, err := getMemberIdByName(ctx, cc, memberName) require.NoError(t, err) require.Equal(t, found, true, "Member not found") @@ -59,9 +58,9 @@ func TestMemberReplace(t *testing.T) { time.Sleep(etcdserver.HealthInterval) t.Logf("Removing member %s", memberName) - _, err = c.MemberRemove(ctx, memberID) + _, err = cc.MemberRemove(ctx, memberID) require.NoError(t, err) - _, found, err = getMemberIdByName(ctx, c, memberName) + _, found, err = getMemberIdByName(ctx, cc, memberName) require.NoError(t, err) require.Equal(t, found, false, "Expected member to be removed") for member.IsRunning() { @@ -82,12 +81,14 @@ func TestMemberReplace(t *testing.T) { err = patchArgs(member.Config().Args, "initial-cluster-state", "existing") require.NoError(t, err) + // Sleep 100ms to bypass the known issue https://github.com/etcd-io/etcd/issues/16687. + time.Sleep(100 * time.Millisecond) t.Logf("Starting member %s", memberName) err = member.Start(ctx) require.NoError(t, err) testutils.ExecuteUntil(ctx, t, func() { for { - _, found, err := getMemberIdByName(ctx, c, memberName) + _, found, err := getMemberIdByName(ctx, cc, memberName) if err != nil || !found { time.Sleep(10 * time.Millisecond) continue