From ec314c8831644a8d0fa327f0a7f952b74257a0f5 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 19 Nov 2023 15:28:44 +0100 Subject: [PATCH 1/6] Remove code for setting cluster version via V2 API Signed-off-by: Marek Siarkowicz --- server/etcdserver/server.go | 40 -------------------------- server/etcdserver/server_test.go | 48 -------------------------------- 2 files changed, 88 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4b40e32bada..1627b7e74a2 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2267,46 +2267,6 @@ func (s *EtcdServer) monitorCompactHash() { } } -func (s *EtcdServer) updateClusterVersionV2(ver string) { - lg := s.Logger() - - if s.cluster.Version() == nil { - lg.Info( - "setting up initial cluster version using v2 API", - zap.String("cluster-version", version.Cluster(ver)), - ) - } else { - lg.Info( - "updating cluster version using v2 API", - zap.String("from", version.Cluster(s.cluster.Version().String())), - zap.String("to", version.Cluster(ver)), - ) - } - - req := pb.Request{ - Method: "PUT", - Path: membership.StoreClusterVersionKey(), - Val: ver, - } - - ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) - _, err := s.Do(ctx, req) - cancel() - - switch err { - case nil: - lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) - return - - case errors.ErrStopped: - lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) - return - - default: - lg.Warn("failed to update cluster version", zap.Error(err)) - } -} - func (s *EtcdServer) updateClusterVersionV3(ver string) { lg := s.Logger() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 7b945942976..01e7b6efca4 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -21,7 +21,6 @@ import ( "math" "net/http" "os" - "path" "path/filepath" "reflect" "sync" @@ -1631,53 +1630,6 @@ func TestPublishV3Retry(t *testing.T) { <-ch } -func TestUpdateVersion(t *testing.T) { - n := newNodeRecorder() - ch := make(chan any, 1) - // simulate that request has gone through consensus - ch <- Response{} - w := wait.NewWithResponse(ch) - ctx, cancel := context.WithCancel(context.TODO()) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - memberId: 1, - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}), - attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, - cluster: &membership.RaftCluster{}, - w: w, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - - ctx: ctx, - cancel: cancel, - } - srv.updateClusterVersionV2("2.0.0") - - action := n.Action() - if len(action) != 1 { - t.Fatalf("len(action) = %d, want 1", len(action)) - } - if action[0].Name != "Propose" { - t.Fatalf("action = %s, want Propose", action[0].Name) - } - data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { - t.Fatalf("unmarshal request error: %v", err) - } - if r.Method != "PUT" { - t.Errorf("method = %s, want PUT", r.Method) - } - if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath { - t.Errorf("path = %s, want %s", r.Path, wpath) - } - if r.Val != "2.0.0" { - t.Errorf("val = %s, want %s", r.Val, "2.0.0") - } -} - func TestUpdateVersionV3(t *testing.T) { n := newNodeRecorder() ch := make(chan any, 1) From 3de71d0f070f641d5283bbe61f44410d28cc0ad0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 19 Nov 2023 15:32:53 +0100 Subject: [PATCH 2/6] Remove code used to make v2 proposals Signed-off-by: Marek Siarkowicz --- server/etcdserver/server.go | 2 - server/etcdserver/server_test.go | 312 ------------------------------- server/etcdserver/v2_server.go | 141 -------------- 3 files changed, 455 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1627b7e74a2..96098793626 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -140,8 +140,6 @@ type ServerV2 interface { Server Leader() types.ID - // Do takes a V2 request and attempts to fulfill it, returning a Response. - Do(ctx context.Context, r pb.Request) (Response, error) ClientCertAuthEnabled() bool } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 01e7b6efca4..62e32c77693 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -62,123 +62,6 @@ import ( "go.etcd.io/raft/v3/raftpb" ) -// TestDoLocalAction tests requests which do not need to go through raft to be applied, -// and are served through local data. -func TestDoLocalAction(t *testing.T) { - tests := []struct { - req pb.Request - - wresp Response - werr error - wactions []testutil.Action - }{ - { - pb.Request{Method: "GET", ID: 1, Wait: true}, - Response{Watcher: v2store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}}, - }, - { - pb.Request{Method: "GET", ID: 1}, - Response{Event: &v2store.Event{}}, nil, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - { - pb.Request{Method: "HEAD", ID: 1}, - Response{Event: &v2store.Event{}}, nil, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - { - pb.Request{Method: "BADMETHOD", ID: 1}, - Response{}, errors.ErrUnknownMethod, []testutil.Action{}, - }, - } - for i, tt := range tests { - st := mockstore.NewRecorder() - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - resp, err := srv.Do(context.Background(), tt.req) - - if err != tt.werr { - t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr) - } - if !reflect.DeepEqual(resp, tt.wresp) { - t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) - } - gaction := st.Action() - if !reflect.DeepEqual(gaction, tt.wactions) { - t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions) - } - } -} - -// TestDoBadLocalAction tests server requests which do not need to go through consensus, -// and return errors when they fetch from local data. -func TestDoBadLocalAction(t *testing.T) { - storeErr := fmt.Errorf("bah") - tests := []struct { - req pb.Request - - wactions []testutil.Action - }{ - { - pb.Request{Method: "GET", ID: 1, Wait: true}, - []testutil.Action{{Name: "Watch"}}, - }, - { - pb.Request{Method: "GET", ID: 1}, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - { - pb.Request{Method: "HEAD", ID: 1}, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - } - for i, tt := range tests { - st := mockstore.NewErrRecorder(storeErr) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - resp, err := srv.Do(context.Background(), tt.req) - - if err != storeErr { - t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr) - } - if !reflect.DeepEqual(resp, Response{}) { - t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{}) - } - gaction := st.Action() - if !reflect.DeepEqual(gaction, tt.wactions) { - t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions) - } - } -} - // TestApplyRepeat tests that server handles repeat raft messages gracefully func TestApplyRepeat(t *testing.T) { lg := zaptest.NewLogger(t) @@ -795,115 +678,6 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { } } -func TestDoProposal(t *testing.T) { - tests := []pb.Request{ - {Method: "POST", ID: 1}, - {Method: "PUT", ID: 1}, - {Method: "DELETE", ID: 1}, - {Method: "GET", ID: 1, Quorum: true}, - } - for i, tt := range tests { - st := mockstore.NewRecorder() - r := newRaftNode(raftNodeConfig{ - lg: zaptest.NewLogger(t), - Node: newNodeCommitter(), - storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), - transport: newNopTransporter(), - }) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewFakeConsistentIndex(0), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - srv.start() - resp, err := srv.Do(context.Background(), tt) - srv.Stop() - - action := st.Action() - if len(action) != 1 { - t.Errorf("#%d: len(action) = %d, want 1", i, len(action)) - } - if err != nil { - t.Fatalf("#%d: err = %v, want nil", i, err) - } - // resp.Index is set in Do() based on the raft state; may either be 0 or 1 - wresp := Response{Event: &v2store.Event{}, Index: resp.Index} - if !reflect.DeepEqual(resp, wresp) { - t.Errorf("#%d: resp = %v, want %v", i, resp, wresp) - } - } -} - -func TestDoProposalCancelled(t *testing.T) { - wt := mockwait.NewRecorder() - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), - w: wt, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) - - if err != errors.ErrCanceled { - t.Fatalf("err = %v, want %v", err, errors.ErrCanceled) - } - w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}} - if !reflect.DeepEqual(wt.Action(), w) { - t.Errorf("wt.action = %+v, want %+v", wt.Action(), w) - } -} - -func TestDoProposalTimeout(t *testing.T) { - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), - w: mockwait.NewNop(), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - ctx, cancel := context.WithTimeout(context.Background(), 0) - _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) - cancel() - if err != errors.ErrTimeout { - t.Fatalf("err = %v, want %v", err, errors.ErrTimeout) - } -} - -func TestDoProposalStopped(t *testing.T) { - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: newNodeNop()}), - w: mockwait.NewNop(), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - srv.stopping = make(chan struct{}) - close(srv.stopping) - _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1}) - if err != errors.ErrStopped { - t.Errorf("err = %v, want %v", err, errors.ErrStopped) - } -} - // TestSync tests sync 1. is nonblocking 2. proposes SYNC request. func TestSync(t *testing.T) { n := newNodeRecorder() @@ -1190,73 +964,6 @@ func TestSnapshotOrdering(t *testing.T) { } } -// TestTriggerSnap for Applied > SnapshotCount should trigger a SaveSnap event -func TestTriggerSnap(t *testing.T) { - be, tmpPath := betesting.NewDefaultTmpBackend(t) - defer func() { - os.RemoveAll(tmpPath) - }() - - snapc := 10 - st := mockstore.NewRecorder() - p := mockstorage.NewStorageRecorderStream("") - r := newRaftNode(raftNodeConfig{ - lg: zaptest.NewLogger(t), - Node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), - storage: p, - transport: newNopTransporter(), - }) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewConsistentIndex(be), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) - srv.be = be - - cl := membership.NewCluster(zaptest.NewLogger(t)) - srv.cluster = cl - - srv.start() - - donec := make(chan struct{}) - go func() { - defer close(donec) - wcnt := 3 + snapc - gaction, _ := p.Wait(wcnt) - - // each operation is recorded as a Save - // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release - if len(gaction) != wcnt { - t.Logf("gaction: %v", gaction) - t.Errorf("len(action) = %d, want %d", len(gaction), wcnt) - return - } - if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2]) - } - - if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) { - t.Errorf("action = %s, want Release", gaction[wcnt-1]) - } - }() - - for i := 0; i < snapc+1; i++ { - srv.Do(context.Background(), pb.Request{Method: "PUT"}) - } - - <-donec - srv.Stop() -} - // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with // proposals. func TestConcurrentApplyAndSnapshotV3(t *testing.T) { @@ -1866,25 +1573,6 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange return &raftpb.ConfState{} } -// nodeCommitter commits proposed data immediately. -type nodeCommitter struct { - readyNode - index uint64 -} - -func newNodeCommitter() raft.Node { - return &nodeCommitter{*newNopReadyNode(), 0} -} -func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { - n.index++ - ents := []raftpb.Entry{{Index: n.index, Data: data}} - n.readyc <- raft.Ready{ - Entries: ents, - CommittedEntries: ents, - } - return nil -} - func newTestCluster(t testing.TB) *membership.RaftCluster { return membership.NewCluster(zaptest.NewLogger(t)) } diff --git a/server/etcdserver/v2_server.go b/server/etcdserver/v2_server.go index 517d7ca7f70..8636204b544 100644 --- a/server/etcdserver/v2_server.go +++ b/server/etcdserver/v2_server.go @@ -15,152 +15,11 @@ package etcdserver import ( - "context" - "time" - pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/etcdserver/errors" ) type RequestV2 pb.Request -type RequestV2Handler interface { - Post(ctx context.Context, r *RequestV2) (Response, error) - Put(ctx context.Context, r *RequestV2) (Response, error) - Delete(ctx context.Context, r *RequestV2) (Response, error) - QGet(ctx context.Context, r *RequestV2) (Response, error) - Get(ctx context.Context, r *RequestV2) (Response, error) - Head(ctx context.Context, r *RequestV2) (Response, error) -} - -type reqV2HandlerEtcdServer struct { - reqV2HandlerStore - s *EtcdServer -} - -type reqV2HandlerStore struct { - store v2store.Store - applier ApplierV2 -} - -func NewStoreRequestV2Handler(s v2store.Store, applier ApplierV2) RequestV2Handler { - return &reqV2HandlerStore{s, applier} -} - -func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Post(r), nil -} - -func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Put(r, membership.ApplyBoth), nil -} - -func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Delete(r), nil -} - -func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.QGet(r), nil -} - -func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) { - if r.Wait { - wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) - return Response{Watcher: wc}, err - } - ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted) - return Response{Event: ev}, err -} - -func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) { - ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted) - return Response{Event: ev}, err -} - -func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) { - data, err := ((*pb.Request)(r)).Marshal() - if err != nil { - return Response{}, err - } - ch := a.s.w.Register(r.ID) - - start := time.Now() - a.s.r.Propose(ctx, data) - proposalsPending.Inc() - defer proposalsPending.Dec() - - select { - case x := <-ch: - resp := x.(Response) - return resp, resp.Err - case <-ctx.Done(): - proposalsFailed.Inc() - a.s.w.Trigger(r.ID, nil) // GC wait - return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) - case <-a.s.stopping: - } - return Response{}, errors.ErrStopped -} - -func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { - r.ID = s.reqIDGen.Next() - h := &reqV2HandlerEtcdServer{ - reqV2HandlerStore: reqV2HandlerStore{ - store: s.v2store, - applier: s.applyV2, - }, - s: s, - } - rp := &r - resp, err := ((*RequestV2)(rp)).Handle(ctx, h) - resp.Term, resp.Index = s.Term(), s.CommittedIndex() - return resp, err -} - -// Handle interprets r and performs an operation on s.store according to r.Method -// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with -// Quorum == true, r will be sent through consensus before performing its -// respective operation. Do will block until an action is performed or there is -// an error. -func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) { - if r.Method == "GET" && r.Quorum { - r.Method = "QGET" - } - switch r.Method { - case "POST": - return v2api.Post(ctx, r) - case "PUT": - return v2api.Put(ctx, r) - case "DELETE": - return v2api.Delete(ctx, r) - case "QGET": - return v2api.QGet(ctx, r) - case "GET": - return v2api.Get(ctx, r) - case "HEAD": - return v2api.Head(ctx, r) - } - return Response{}, errors.ErrUnknownMethod -} - func (r *RequestV2) String() string { rpb := pb.Request(*r) return rpb.String() From fcf9a6fbb4399f4ea04ab60902ef2b89e10da6dd Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 19 Nov 2023 15:36:01 +0100 Subject: [PATCH 3/6] Remove apply v2 logic that is no longer used v2 store is no longer available in v3.6. Only v2 PUT is neeeded as it applies to v3 storage and etcd v3.5 uses it for setting member attributes and cluster version. We can ignore all other requests. Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply_v2.go | 34 ----- server/etcdserver/server_test.go | 243 ------------------------------- 2 files changed, 277 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index c9e4c3e87b0..6c67be11abe 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -37,11 +37,7 @@ const v2Version = "v2" // ApplierV2 is the interface for processing V2 raft messages type ApplierV2 interface { - Delete(r *RequestV2) Response - Post(r *RequestV2) Response Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response - QGet(r *RequestV2) Response - Sync(r *RequestV2) Response } func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 { @@ -57,19 +53,6 @@ type applierV2store struct { cluster *membership.RaftCluster } -func (a *applierV2store) Delete(r *RequestV2) Response { - switch { - case r.PrevIndex > 0 || r.PrevValue != "": - return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) - default: - return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive)) - } -} - -func (a *applierV2store) Post(r *RequestV2) Response { - return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions())) -} - func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response { ttlOptions := r.TTLOptions() exists, existsSet := pbutil.GetBool(r.PrevExist) @@ -109,15 +92,6 @@ func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV } } -func (a *applierV2store) QGet(r *RequestV2) Response { - return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted)) -} - -func (a *applierV2store) Sync(r *RequestV2) Response { - a.store.DeleteExpiredKeys(time.Unix(0, r.Time)) - return Response{} -} - // applyV2Request interprets r as a call to v2store.X // and returns a Response interpreted from v2store.Event func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) { @@ -136,16 +110,8 @@ func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.Shoul }(time.Now()) switch r.Method { - case "POST": - return s.applyV2.Post(r) case "PUT": return s.applyV2.Put(r, shouldApplyV3) - case "DELETE": - return s.applyV2.Delete(r) - case "QGET": - return s.applyV2.QGet(r) - case "SYNC": - return s.applyV2.Sync(r) default: // This should never be reached, but just in case: return Response{Err: errors.ErrUnknownMethod} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 62e32c77693..04f490a4993 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -133,249 +133,6 @@ func TestApplyRepeat(t *testing.T) { } } -func TestApplyRequest(t *testing.T) { - tests := []struct { - req pb.Request - - wresp Response - wactions []testutil.Action - }{ - // POST ==> Create - { - pb.Request{Method: "POST", ID: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Create", - Params: []any{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // POST ==> Create, with expiration - { - pb.Request{Method: "POST", ID: 1, Expiration: 1337}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Create", - Params: []any{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}}, - }, - }, - }, - // POST ==> Create, with dir - { - pb.Request{Method: "POST", ID: 1, Dir: true}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Create", - Params: []any{"", true, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT ==> Set - { - pb.Request{Method: "PUT", ID: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Set", - Params: []any{"", false, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT ==> Set, with dir - { - pb.Request{Method: "PUT", ID: 1, Dir: true}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Set", - Params: []any{"", true, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevExist=true ==> Update - { - pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Update", - Params: []any{"", "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevExist=false ==> Create - { - pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Create", - Params: []any{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap - { - pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndSwap", - Params: []any{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevExist=false *and* PrevIndex set ==> Create - { - pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Create", - Params: []any{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevIndex set ==> CompareAndSwap - { - pb.Request{Method: "PUT", ID: 1, PrevIndex: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndSwap", - Params: []any{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevValue set ==> CompareAndSwap - { - pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndSwap", - Params: []any{"", "bar", uint64(0), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // PUT with PrevIndex and PrevValue set ==> CompareAndSwap - { - pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndSwap", - Params: []any{"", "bar", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}}, - }, - }, - }, - // DELETE ==> Delete - { - pb.Request{Method: "DELETE", ID: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Delete", - Params: []any{"", false, false}, - }, - }, - }, - // DELETE with PrevIndex set ==> CompareAndDelete - { - pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndDelete", - Params: []any{"", "", uint64(1)}, - }, - }, - }, - // DELETE with PrevValue set ==> CompareAndDelete - { - pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndDelete", - Params: []any{"", "bar", uint64(0)}, - }, - }, - }, - // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete - { - pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "CompareAndDelete", - Params: []any{"", "bar", uint64(5)}, - }, - }, - }, - // QGET ==> Get - { - pb.Request{Method: "QGET", ID: 1}, - Response{Event: &v2store.Event{}}, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - // SYNC ==> DeleteExpiredKeys - { - pb.Request{Method: "SYNC", ID: 1}, - Response{}, - []testutil.Action{ - { - Name: "DeleteExpiredKeys", - Params: []any{time.Unix(0, 0)}, - }, - }, - }, - { - pb.Request{Method: "SYNC", ID: 1, Time: 12345}, - Response{}, - []testutil.Action{ - { - Name: "DeleteExpiredKeys", - Params: []any{time.Unix(0, 12345)}, - }, - }, - }, - // Unknown method - error - { - pb.Request{Method: "BADMETHOD", ID: 1}, - Response{Err: errors.ErrUnknownMethod}, - []testutil.Action{}, - }, - } - - for i, tt := range tests { - st := mockstore.NewRecorder() - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: st, - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth) - - if !reflect.DeepEqual(resp, tt.wresp) { - t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) - } - gaction := st.Action() - if !reflect.DeepEqual(gaction, tt.wactions) { - t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions) - } - } -} - func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) From b050939a69d1948362c8c39238a69b38ccdb69d3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 19 Nov 2023 22:34:18 +0100 Subject: [PATCH 4/6] Remove v2 monitoring Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply_v2.go | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index 6c67be11abe..dcbf8a44a0c 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -16,25 +16,20 @@ package etcdserver import ( "encoding/json" - "fmt" "path" "time" - "unicode/utf8" "github.com/coreos/go-semver/semver" + "go.uber.org/zap" + "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/errors" - "go.etcd.io/etcd/server/v3/etcdserver/txn" - - "go.uber.org/zap" ) -const v2Version = "v2" - // ApplierV2 is the interface for processing V2 raft messages type ApplierV2 interface { Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response @@ -95,20 +90,6 @@ func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV // applyV2Request interprets r as a call to v2store.X // and returns a Response interpreted from v2store.Event func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) { - stringer := panicAlternativeStringer{ - stringer: r, - alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, - } - defer func(start time.Time) { - if !utf8.ValidString(r.Method) { - s.lg.Info("method is not valid utf-8") - return - } - success := resp.Err == nil - txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start)) - txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil) - }(time.Now()) - switch r.Method { case "PUT": return s.applyV2.Put(r, shouldApplyV3) From 22e59e05c14802a8c74bf4056cef6699b410e198 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 19 Nov 2023 22:37:39 +0100 Subject: [PATCH 5/6] Remove put code used for non cluster operations Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply_v2.go | 51 +++++++++++------------------------ 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index dcbf8a44a0c..c12efa55547 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -49,42 +49,25 @@ type applierV2store struct { } func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response { - ttlOptions := r.TTLOptions() - exists, existsSet := pbutil.GetBool(r.PrevExist) - switch { - case existsSet: - if exists { - if r.PrevIndex == 0 && r.PrevValue == "" { - return toResponse(a.store.Update(r.Path, r.Val, ttlOptions)) - } - return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) + if storeMemberAttributeRegexp.MatchString(r.Path) { + id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path)) + var attr membership.Attributes + if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { + a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } - return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) - case r.PrevIndex > 0 || r.PrevValue != "": - return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) - default: - if storeMemberAttributeRegexp.MatchString(r.Path) { - id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path)) - var attr membership.Attributes - if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) - } - if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr, shouldApplyV3) - } - // return an empty response since there is no consumer. - return Response{} + if a.cluster != nil { + a.cluster.UpdateAttributes(id, attr, shouldApplyV3) } - // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 - if r.Path == membership.StoreClusterVersionKey() { - if a.cluster != nil { - // persist to backend given v2store can be very stale - a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) - } - return Response{} + } + // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 + if r.Path == membership.StoreClusterVersionKey() { + if a.cluster != nil { + // persist to backend given v2store can be very stale + a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) } - return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) } + // return an empty response since there is no consumer. + return Response{} } // applyV2Request interprets r as a call to v2store.X @@ -107,7 +90,3 @@ func (r *RequestV2) TTLOptions() v2store.TTLOptionSet { } return ttlOptions } - -func toResponse(ev *v2store.Event, err error) Response { - return Response{Event: ev, Err: err} -} From d47cb7bc9477faacb116504d87fcbfe839405fcc Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 19 Nov 2023 22:45:50 +0100 Subject: [PATCH 6/6] Remove v2 applier Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply_v2.go | 63 +++++++++++--------------------- server/etcdserver/server.go | 3 -- server/etcdserver/server_test.go | 6 --- 3 files changed, 21 insertions(+), 51 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index c12efa55547..521eca4ded4 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -21,61 +21,40 @@ import ( "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/server/v3/etcdserver/api" + "go.uber.org/zap" "go.etcd.io/etcd/pkg/v3/pbutil" - "go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/errors" ) -// ApplierV2 is the interface for processing V2 raft messages -type ApplierV2 interface { - Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response -} - -func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 { - if lg == nil { - lg = zap.NewNop() - } - return &applierV2store{lg: lg, store: s, cluster: c} -} - -type applierV2store struct { - lg *zap.Logger - store v2store.Store - cluster *membership.RaftCluster -} - -func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response { - if storeMemberAttributeRegexp.MatchString(r.Path) { - id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path)) - var attr membership.Attributes - if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) - } - if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr, shouldApplyV3) - } - } - // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 - if r.Path == membership.StoreClusterVersionKey() { - if a.cluster != nil { - // persist to backend given v2store can be very stale - a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) - } - } - // return an empty response since there is no consumer. - return Response{} -} - // applyV2Request interprets r as a call to v2store.X // and returns a Response interpreted from v2store.Event func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) { switch r.Method { case "PUT": - return s.applyV2.Put(r, shouldApplyV3) + if storeMemberAttributeRegexp.MatchString(r.Path) { + id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path)) + var attr membership.Attributes + if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { + s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) + } + if s.cluster != nil { + s.cluster.UpdateAttributes(id, attr, shouldApplyV3) + } + } + // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 + if r.Path == membership.StoreClusterVersionKey() { + if s.cluster != nil { + // persist to backend given v2store can be very stale + s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) + } + } + // return an empty response since there is no consumer. + return Response{} default: // This should never be reached, but just in case: return Response{Err: errors.ErrUnknownMethod} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 96098793626..9eed122b1bb 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -245,8 +245,6 @@ type EtcdServer struct { v2store v2store.Store snapshotter *snap.Snapshotter - applyV2 ApplierV2 - uberApply apply.UberApplier applyWait wait.WaitTime @@ -336,7 +334,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) - srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be srv.beHooks = b.storage.backend.beHooks diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 04f490a4993..321ecc7b9d2 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -93,7 +93,6 @@ func TestApplyRepeat(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), } - s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} s.start() req := &pb.Request{Method: "QGET", ID: uint64(1)} ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} @@ -143,7 +142,6 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { v2store: mockstore.NewRecorder(), cluster: cl, } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} req := pb.Request{ Method: "PUT", @@ -447,7 +445,6 @@ func TestSync(t *testing.T) { ctx: ctx, cancel: cancel, } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} // check that sync is non-blocking done := make(chan struct{}, 1) @@ -492,7 +489,6 @@ func TestSyncTimeout(t *testing.T) { ctx: ctx, cancel: cancel, } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} // check that sync is non-blocking done := make(chan struct{}, 1) @@ -674,7 +670,6 @@ func TestSnapshotOrdering(t *testing.T) { consistIndex: ci, beHooks: serverstorage.NewBackendHooks(lg, ci), } - s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{}) s.be = be @@ -765,7 +760,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { consistIndex: ci, beHooks: serverstorage.NewBackendHooks(lg, ci), } - s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{}) s.be = be