Skip to content

Commit

Permalink
Merge pull request etcd-io#16418 from geetasg/pr7
Browse files Browse the repository at this point in the history
Update to generate v2 snapshot from v3 state
  • Loading branch information
serathius committed Aug 22, 2023
2 parents e1b8e8c + 59332dc commit f3cc759
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 31 deletions.
9 changes: 2 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.v2store.Clone()
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
Expand All @@ -2075,16 +2074,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)

s.GoAttach(func() {
lg := s.Logger()

d, err := clone.SaveNoCopy()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
lg.Panic("failed to save v2 store", zap.Error(err))
}
// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
Expand Down
31 changes: 12 additions & 19 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,10 @@ func TestSnapshot(t *testing.T) {
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be

ch := make(chan struct{}, 2)
cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

ch := make(chan struct{}, 1)

go func() {
gaction, _ := p.Wait(2)
Expand All @@ -1066,24 +1069,11 @@ func TestSnapshot(t *testing.T) {
}
}()

go func() {
gaction, _ := st.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 2 {
t.Errorf("len(action) = %d, want 2", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
t.Errorf("action = %s, want Clone", gaction[0])
}
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
t.Errorf("action = %s, want SaveNoCopy", gaction[1])
}
}()

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
}
}

// TestSnapshotOrdering ensures raft persists snapshot onto disk before
Expand All @@ -1098,7 +1088,8 @@ func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode()
st := v2store.New()
cl := membership.NewCluster(lg)
cl.SetStore(st)
be, _ := betesting.NewDefaultTmpBackend(t)
cl.SetBackend(schema.NewMembershipBackend(lg, be))

testdir := t.TempDir()

Expand All @@ -1118,7 +1109,6 @@ func TestSnapshotOrdering(t *testing.T) {
storage: p,
raftStorage: rs,
})
be, _ := betesting.NewDefaultTmpBackend(t)
ci := cindex.NewConsistentIndex(be)
s := &EtcdServer{
lgMu: new(sync.RWMutex),
Expand Down Expand Up @@ -1211,6 +1201,9 @@ func TestTriggerSnap(t *testing.T) {
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be

cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

srv.start()

donec := make(chan struct{})
Expand Down
6 changes: 1 addition & 5 deletions server/etcdserver/snapshot_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ import (
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
lg := s.Logger()
// get a snapshot of v2 store as []byte
clone := s.v2store.Clone()
d, err := clone.SaveNoCopy()
if err != nil {
lg.Panic("failed to save v2 store data", zap.Error(err))
}
d := GetMembershipInfoInV2Format(lg, s.cluster)

// commit kv to write metadata(for example: consistent index).
s.KV().Commit()
Expand Down

0 comments on commit f3cc759

Please sign in to comment.