Skip to content

Commit

Permalink
commit transaction for each configuration change
Browse files Browse the repository at this point in the history
Current when etcd remove a member, it just removes it from bbolt db,
but doesn't update the cache. On the other hands, etcd periodically
commit each bbolt transaction. When etcd processes the next conf
change request, it might not have commit the previous member remove
request into bbolt. Accordingly it breaks the linerizability.

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Dec 15, 2023
1 parent 2cf112f commit d0d762a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
3 changes: 3 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,9 @@ func (s *EtcdServer) apply(
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
if err != nil {
s.lg.Error("failed to apply conf change", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
}
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func newServer(t *testing.T, recorder *nodeRecorder) *EtcdServer {
r: *newRaftNode(raftNodeConfig{lg: lg, Node: recorder}),
cluster: membership.NewCluster(lg),
consistIndex: cindex.NewConsistentIndex(be),
kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
srv.cluster.SetBackend(schema.NewMembershipBackend(lg, be))
srv.cluster.SetStore(v2store.New())
Expand Down Expand Up @@ -475,6 +476,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
w: wait.New(),
consistIndex: ci,
beHooks: serverstorage.NewBackendHooks(lg, ci),
kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}

// create EntryConfChange entry
Expand Down Expand Up @@ -567,6 +569,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
w: wait.New(),
consistIndex: ci,
beHooks: serverstorage.NewBackendHooks(lg, ci),
kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
var ents []raftpb.Entry
for i := 1; i <= 4; i++ {
Expand Down Expand Up @@ -876,6 +879,7 @@ func TestAddMember(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: serverstorage.NewBackendHooks(lg, nil),
kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
Expand Down Expand Up @@ -981,6 +985,7 @@ func TestRemoveMember(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: serverstorage.NewBackendHooks(lg, nil),
kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()
_, err := s.RemoveMember(context.Background(), 1234)
Expand Down Expand Up @@ -1030,6 +1035,7 @@ func TestUpdateMember(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: serverstorage.NewBackendHooks(lg, nil),
kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
Expand Down
18 changes: 17 additions & 1 deletion server/storage/schema/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,25 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) {

tx := s.be.BatchTx()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafeDelete(Members, mkey)
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
tx.Unlock()

// We need to forcibly commit the transaction, otherwise etcd might
// run into a situation that it haven't finished committing the data
// into backend storage (note: etcd periodically commits the bbolt
// transactions instead of on each request) when it receives next
// confChange request. Accordingly, etcd may still reads the stale
// data from bbolt when processing next confChange request. So it
// breaks linearizability.
//
// Note we don't need to forcibly commit the transaction for other
// kinds of request (e.g. normal key/value operations, member add
// or update requests), because there is a buffer on top of the bbolt.
// Each time when etcd reads data from backend storage, it will read
// data from both bbolt and the buffer. But there is no such a buffer
// for member delete requests.
s.be.ForceCommit()
}

func (s *membershipBackend) MustReadMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool) {
Expand Down
17 changes: 9 additions & 8 deletions tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,27 @@ func TestMemberReplace(t *testing.T) {
require.NoError(t, err)
defer epc.Close()

memberId := rand.Int() % len(epc.Procs)
member := epc.Procs[memberId]
memberIdx := rand.Int() % len(epc.Procs)
member := epc.Procs[memberIdx]
memberName := member.Config().Name
var endpoints []string
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberId+i)%len(epc.Procs)].EndpointsGRPC()...)
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, endpoints)
require.NoError(t, err)

c := epc.Etcdctl()
memberID, found, err := getMemberIdByName(ctx, c, memberName)
memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, true, "Member not found")

// Need to wait health interval for cluster to accept member changes
time.Sleep(etcdserver.HealthInterval)

t.Logf("Removing member %s", memberName)
_, err = c.MemberRemove(ctx, memberID)
_, err = cc.MemberRemove(ctx, memberID)
require.NoError(t, err)
_, found, err = getMemberIdByName(ctx, c, memberName)
_, found, err = getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, false, "Expected member to be removed")
for member.IsRunning() {
Expand All @@ -82,12 +81,14 @@ func TestMemberReplace(t *testing.T) {
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
require.NoError(t, err)

// Sleep 100ms to bypass the known issue https://github.com/etcd-io/etcd/issues/16687.
time.Sleep(100 * time.Millisecond)
t.Logf("Starting member %s", memberName)
err = member.Start(ctx)
require.NoError(t, err)
testutils.ExecuteUntil(ctx, t, func() {
for {
_, found, err := getMemberIdByName(ctx, c, memberName)
_, found, err := getMemberIdByName(ctx, cc, memberName)
if err != nil || !found {
time.Sleep(10 * time.Millisecond)
continue
Expand Down

0 comments on commit d0d762a

Please sign in to comment.