Skip to content

Commit

Permalink
Merge pull request etcd-io#17007 from serathius/remove-v2-applier
Browse files Browse the repository at this point in the history
Remove v2 applier
  • Loading branch information
serathius authored Nov 23, 2023
2 parents b17c1de + 093666f commit 5426f6d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 146 deletions.
152 changes: 16 additions & 136 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,154 +16,34 @@ package etcdserver

import (
"encoding/json"
"fmt"
"path"
"time"
"unicode/utf8"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"

"go.uber.org/zap"
)

const v2Version = "v2"

// ApplierV2 is the interface for processing V2 raft messages
type ApplierV2 interface {
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}

func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
if lg == nil {
lg = zap.NewNop()
}
return &applierV2store{lg: lg, store: s, cluster: c}
}

type applierV2store struct {
lg *zap.Logger
store v2store.Store
cluster *membership.RaftCluster
}

func (a *applierV2store) Delete(r *RequestV2) Response {
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
}
}

func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}

func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
}
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// return an empty response since there is no consumer.
return Response{}
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return Response{}
}
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
}

func (a *applierV2store) QGet(r *RequestV2) Response {
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
}

func (a *applierV2store) Sync(r *RequestV2) Response {
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
}

// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) {
if r.Method != "PUT" || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) {
s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
}
stringer := panicAlternativeStringer{
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
}
defer func(start time.Time) {
if !utf8.ValidString(r.Method) {
s.lg.Info("method is not valid utf-8")
return
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if s.cluster != nil {
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
success := resp.Err == nil
txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start))
txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
}(time.Now())

switch r.Method {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r, shouldApplyV3)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":
return s.applyV2.QGet(r)
case "SYNC":
return s.applyV2.Sync(r)
default:
// This should never be reached, but just in case:
return Response{Err: errors.ErrUnknownMethod}
}
}

func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if s.cluster != nil {
// persist to backend given v2store can be very stale
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
}
return ttlOptions
}

func toResponse(ev *v2store.Event, err error) Response {
return Response{Event: ev, Err: err}
}
9 changes: 4 additions & 5 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,6 @@ type EtcdServer struct {
v2store v2store.Store
snapshotter *snap.Snapshotter

applyV2 ApplierV2

uberApply apply.UberApplier

applyWait wait.WaitTime
Expand Down Expand Up @@ -336,7 +334,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)

srv.be = b.storage.backend.be
srv.beHooks = b.storage.backend.beHooks
Expand Down Expand Up @@ -1860,14 +1857,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
s.applyV2Request((*RequestV2)(rp), shouldApplyV3)
s.w.Trigger(r.ID, Response{})
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))

if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
s.applyV2Request(req, shouldApplyV3)
s.w.Trigger(req.ID, Response{})
return
}

Expand Down
5 changes: 0 additions & 5 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func TestApplyRepeat(t *testing.T) {
consistIndex: cindex.NewFakeConsistentIndex(0),
uberApply: uberApplierMock{},
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.start()
req := &pb.InternalRaftRequest{
Header: &pb.RequestHeader{ID: 1},
Expand Down Expand Up @@ -159,7 +158,6 @@ func TestV2SetMemberAttributes(t *testing.T) {
v2store: mockstore.NewRecorder(),
cluster: cl,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

req := pb.Request{
Method: "PUT",
Expand Down Expand Up @@ -187,7 +185,6 @@ func TestV2SetClusterVersion(t *testing.T) {
v2store: mockstore.NewRecorder(),
cluster: cl,
}
srv.applyV2 = NewApplierV2(srv.lg, srv.v2store, srv.cluster)

req := pb.Request{
Method: "PUT",
Expand Down Expand Up @@ -580,7 +577,6 @@ func TestSnapshotOrdering(t *testing.T) {
consistIndex: ci,
beHooks: serverstorage.NewBackendHooks(lg, ci),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}

s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.be = be
Expand Down Expand Up @@ -675,7 +671,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
uberApply: uberApplierMock{},
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}

s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.be = be
Expand Down

0 comments on commit 5426f6d

Please sign in to comment.