Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove v2 store apply logic #16966

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 17 additions & 112 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,136 +16,45 @@ package etcdserver

import (
"encoding/json"
"fmt"
"path"
"time"
"unicode/utf8"

"github.com/coreos/go-semver/semver"

"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api"

"go.uber.org/zap"

"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"

"go.uber.org/zap"
)

const v2Version = "v2"

// ApplierV2 is the interface for processing V2 raft messages
type ApplierV2 interface {
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}

func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
if lg == nil {
lg = zap.NewNop()
}
return &applierV2store{lg: lg, store: s, cluster: c}
}

type applierV2store struct {
lg *zap.Logger
store v2store.Store
cluster *membership.RaftCluster
}

func (a *applierV2store) Delete(r *RequestV2) Response {
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
}
}

func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}

func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
}
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
switch r.Method {
case "PUT":
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
if s.cluster != nil {
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// return an empty response since there is no consumer.
return Response{}
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
if s.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return Response{}
}
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
}

func (a *applierV2store) QGet(r *RequestV2) Response {
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
}

func (a *applierV2store) Sync(r *RequestV2) Response {
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
}

// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
stringer := panicAlternativeStringer{
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
}
defer func(start time.Time) {
if !utf8.ValidString(r.Method) {
s.lg.Info("method is not valid utf-8")
return
}
success := resp.Err == nil
txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start))
txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
}(time.Now())

switch r.Method {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r, shouldApplyV3)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":
return s.applyV2.QGet(r)
case "SYNC":
return s.applyV2.Sync(r)
// return an empty response since there is no consumer.
return Response{}
default:
// This should never be reached, but just in case:
return Response{Err: errors.ErrUnknownMethod}
Expand All @@ -160,7 +69,3 @@ func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
}
return ttlOptions
}

func toResponse(ev *v2store.Event, err error) Response {
return Response{Event: ev, Err: err}
}
45 changes: 0 additions & 45 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ type ServerV2 interface {
Server
Leader() types.ID

// Do takes a V2 request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
ClientCertAuthEnabled() bool
}

Expand Down Expand Up @@ -247,8 +245,6 @@ type EtcdServer struct {
v2store v2store.Store
snapshotter *snap.Snapshotter

applyV2 ApplierV2

uberApply apply.UberApplier

applyWait wait.WaitTime
Expand Down Expand Up @@ -338,7 +334,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)

srv.be = b.storage.backend.be
srv.beHooks = b.storage.backend.beHooks
Expand Down Expand Up @@ -2267,46 +2262,6 @@ func (s *EtcdServer) monitorCompactHash() {
}
}

func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()

if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version using v2 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version using v2 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
}

req := pb.Request{
Method: "PUT",
Path: membership.StoreClusterVersionKey(),
Val: ver,
}

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()

switch err {
case nil:
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return

case errors.ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return

default:
lg.Warn("failed to update cluster version", zap.Error(err))
}
}

func (s *EtcdServer) updateClusterVersionV3(ver string) {
lg := s.Logger()

Expand Down
Loading
Loading