Skip to content

Commit

Permalink
refactor lease renew
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrtr committed Jun 7, 2022
1 parent 08f4c34 commit ee54561
Show file tree
Hide file tree
Showing 20 changed files with 413 additions and 351 deletions.
157 changes: 67 additions & 90 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Documentation/dev-guide/apispec/swagger/v3election.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -56,7 +56,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -98,7 +98,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -131,7 +131,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -164,7 +164,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -203,7 +203,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
"description": "revision is the key-value store revision when the request was applied, and it's\nunset (so 0) in case of calls not interacting with key-value store.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
6 changes: 3 additions & 3 deletions Documentation/dev-guide/apispec/swagger/v3lock.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -56,7 +56,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -95,7 +95,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
"description": "revision is the key-value store revision when the request was applied, and it's\nunset (so 0) in case of calls not interacting with key-value store.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
131 changes: 131 additions & 0 deletions api/etcdserverpb/gw/rpc.pb.gw.go

Large diffs are not rendered by default.

189 changes: 122 additions & 67 deletions api/etcdserverpb/raft_internal.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/etcdserverpb/raft_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message InternalRaftRequest {
AlarmRequest alarm = 10;

LeaseCheckpointRequest lease_checkpoint = 11 [(versionpb.etcd_version_field) = "3.4"];
LeaseKeepAliveRequest lease_renew = 12 [(versionpb.etcd_version_field) = "3.6"];

AuthEnableRequest auth_enable = 1000;
AuthDisableRequest auth_disable = 1011;
Expand Down
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ etcdserverpb.InternalRaftRequest.downgrade_info_set: "3.5"
etcdserverpb.InternalRaftRequest.header: ""
etcdserverpb.InternalRaftRequest.lease_checkpoint: "3.4"
etcdserverpb.InternalRaftRequest.lease_grant: ""
etcdserverpb.InternalRaftRequest.lease_renew: "3.6"
etcdserverpb.InternalRaftRequest.lease_revoke: ""
etcdserverpb.InternalRaftRequest.put: ""
etcdserverpb.InternalRaftRequest.range: ""
Expand Down
1 change: 0 additions & 1 deletion server/etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func newPeerHandler(
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions server/etcdserver/api/v3lock/v3lockpb/gw/v3lock.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions server/etcdserver/api/v3rpc/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,21 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
// or remote leader.
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
// at rev 4.
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
// todo(ahrtr): remove respForLeaseNotFound, we don't need to ErrLeaseNotFound separately.
respForLeaseNotFound := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(respForLeaseNotFound.Header)

ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
resp, err := ls.le.LeaseRenew(stream.Context(), req)
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
respForLeaseNotFound.TTL = 0
resp = respForLeaseNotFound
}

if err != nil {
return togRPCError(err)
}

resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type applierV3 interface {

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)

Expand Down Expand Up @@ -206,6 +207,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err
}

func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID))
return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err
}

func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe
func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
return nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
return nil, errors.ErrCorrupt
}
3 changes: 3 additions & 0 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.LeaseRenew != nil:
op = "LeaseRenew"
ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew)
case r.Alarm != nil:
op = "Alarm"
ar.Resp, ar.Err = a.Alarm(r.Alarm)
Expand Down
48 changes: 7 additions & 41 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ type Lessor interface {
// LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)

// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
// LeaseRenew renews the lease.
LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
Expand Down Expand Up @@ -276,45 +275,12 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return resp.(*pb.LeaseRevokeResponse), nil
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
// Throttle in case of e.g. connection problems.
time.Sleep(50 * time.Millisecond)
}

if cctx.Err() == context.DeadlineExceeded {
return -1, errors.ErrTimeout
func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r})
if err != nil {
return nil, err
}
return -1, errors.ErrCanceled
return resp.(*pb.LeaseKeepAliveResponse), err
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
Expand Down
Loading

0 comments on commit ee54561

Please sign in to comment.