Skip to content

Commit

Permalink
commit transaction for each configuration change
Browse files Browse the repository at this point in the history
Current when etcd remove a member, it just removes it from bbolt db,
but doesn't update the cache. On the other hands, etcd periodically
commit each bbolt transaction. When etcd processes the next conf
change request, it might not have commit the previous member remove
request into bbolt. Accordingly it breaks the linerizability.

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Dec 14, 2023
1 parent a70fa9b commit 884c9d4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
5 changes: 5 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,11 @@ func (s *EtcdServer) apply(
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
if err != nil {
s.lg.Error("failed to apply conf change", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
} else {
s.KV().Commit()
}
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
Expand Down
15 changes: 7 additions & 8 deletions tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,27 @@ func TestMemberReplace(t *testing.T) {
require.NoError(t, err)
defer epc.Close()

memberId := rand.Int() % len(epc.Procs)
member := epc.Procs[memberId]
memberIdx := rand.Int() % len(epc.Procs)
member := epc.Procs[memberIdx]
memberName := member.Config().Name
var endpoints []string
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberId+i)%len(epc.Procs)].EndpointsGRPC()...)
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, endpoints)
require.NoError(t, err)

c := epc.Etcdctl()
memberID, found, err := getMemberIdByName(ctx, c, memberName)
memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, true, "Member not found")

// Need to wait health interval for cluster to accept member changes
time.Sleep(etcdserver.HealthInterval)

t.Logf("Removing member %s", memberName)
_, err = c.MemberRemove(ctx, memberID)
_, err = cc.MemberRemove(ctx, memberID)
require.NoError(t, err)
_, found, err = getMemberIdByName(ctx, c, memberName)
_, found, err = getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, false, "Expected member to be removed")
for member.IsRunning() {
Expand All @@ -87,7 +86,7 @@ func TestMemberReplace(t *testing.T) {
require.NoError(t, err)
testutils.ExecuteUntil(ctx, t, func() {
for {
_, found, err := getMemberIdByName(ctx, c, memberName)
_, found, err := getMemberIdByName(ctx, cc, memberName)
if err != nil || !found {
time.Sleep(10 * time.Millisecond)
continue
Expand Down

0 comments on commit 884c9d4

Please sign in to comment.