Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Use etcd lease to manage safe point expiration #4984

Closed
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
32c44ee
added storage methods for RawKV GC
AmoebaProtozoa May 12, 2022
b12eb9b
push back on updating kvproto in go.mod
AmoebaProtozoa May 12, 2022
d0b4f3b
linting
AmoebaProtozoa May 12, 2022
a2ba0e7
changed storage path structure
AmoebaProtozoa May 12, 2022
b9bb3e4
update comments
AmoebaProtozoa May 12, 2022
a7e3ece
added ByKeySpace suffix for disambiguity
AmoebaProtozoa May 12, 2022
7ff125e
removed default key spaces
AmoebaProtozoa May 13, 2022
1bdb642
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 13, 2022
1ebb51d
renaming, move delete expired safepoints to a goroutine
AmoebaProtozoa May 16, 2022
312704a
update tests
AmoebaProtozoa May 16, 2022
766b344
lint
AmoebaProtozoa May 16, 2022
de77dfc
added back KeySpaceGCSafePoint
AmoebaProtozoa May 16, 2022
d6eda93
remove expired all at once
AmoebaProtozoa May 16, 2022
cbe1ed0
address comments
AmoebaProtozoa May 16, 2022
9b27bcd
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 16, 2022
88610a0
move sleep to failpoint
AmoebaProtozoa May 16, 2022
995033e
modified failpoint to eliminate sleep
AmoebaProtozoa May 17, 2022
3099939
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 17, 2022
a825d37
log error when failed to remove expired service safe point
AmoebaProtozoa May 18, 2022
8bef6ee
added load revision
AmoebaProtozoa May 18, 2022
89cf4d0
added SaveWithTTL to kvs
AmoebaProtozoa May 18, 2022
b8bc86f
updated storage methods to use lease
AmoebaProtozoa May 18, 2022
1eca8d9
update tests to use ttl
AmoebaProtozoa May 18, 2022
7e3651e
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa May 18, 2022
78f3e14
merge and resolve conflict
AmoebaProtozoa May 27, 2022
176061e
lint
AmoebaProtozoa May 27, 2022
5d994ea
use etcdutil to save with ttl
AmoebaProtozoa May 27, 2022
a36305a
update go mod
AmoebaProtozoa May 27, 2022
28a4e9e
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa May 30, 2022
d8e7ab6
update go mod
AmoebaProtozoa May 31, 2022
6d2f26d
update test/client/go.mod
AmoebaProtozoa May 31, 2022
594edc7
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 7, 2022
3ab7d4f
small cleanup
AmoebaProtozoa Jun 7, 2022
a379eea
avoid modification to general kv interface
AmoebaProtozoa Jun 8, 2022
152fe6c
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 8, 2022
a8790d5
storage: handle case where ttl is maxInt
AmoebaProtozoa Jun 8, 2022
5269aff
add etcdKVBase comments
AmoebaProtozoa Jun 8, 2022
7c05203
storage: Revision should be int64
AmoebaProtozoa Jun 8, 2022
77d6f26
storage: add revision tests
AmoebaProtozoa Jun 8, 2022
bda31f4
storage: lint
AmoebaProtozoa Jun 8, 2022
9ff8090
storage: change spaceID to uint32
AmoebaProtozoa Jun 14, 2022
ffe8a18
storage: migrate tests to testify
AmoebaProtozoa Jun 21, 2022
ab98f7c
Merge branch 'master' of github.com:tikv/pd into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions server/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"encoding/json"
"github.com/pingcap/errors"
"go.etcd.io/etcd/clientv3"
"math"
"strconv"
"strings"
)

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeySpaceGCSafePoint struct {
SpaceID string `json:"space_id"`
SafePoint uint64 `json:"safe_point,omitempty"`
}

// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
type KeySpaceGCSafePointStorage interface {
// Service safe point interfaces.
// NOTE: field ServiceSafePoint.ExpiredAt will be ignored, use etcd's lease to manage lifetime instead.
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint, ttl int64) error
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeySpaceGCSafePoint(spaceID string) (uint64, error)
LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error)
}

var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint, ttl int64) error {
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}
return se.SaveWithTTL(key, string(value), ttl)
}

// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint exist for given service.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
value, err := se.Load(key)
if err != nil || value == "" {
return nil, err
}
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, err
}
return ssp, nil
}

// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space.
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string) (*ServiceSafePoint, error) {
prefix := KeySpaceServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
_, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i := range values {
ssp := &ServiceSafePoint{}
if err = json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.SafePoint < min.SafePoint {
min = ssp
}
}
if min.SafePoint == math.MaxUint64 {
// no service safe point or all of them are expired.
return nil, nil
}

// successfully found a valid min safe point.
return min, nil
}

// RemoveServiceSafePoint removes target ServiceSafePoint
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
return se.Remove(key)
}

// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
return se.Save(KeySpaceGCSafePointPath(spaceID), value)
}

// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
// Returns 0 if target safepoint not exist.
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) {
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
if err != nil || value == "" {
return 0, err
}
safePoint, err := strconv.ParseUint(value, 16, 64)
if err != nil {
return 0, err
}
return safePoint, nil
}

// LoadAllKeySpaceGCSafePoints returns slice of KeySpaceGCSafePoint.
// If withGCSafePoint set to false, returned safePoints will be 0.
func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) {
prefix := KeySpaceSafePointPrefix()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
suffix := KeySpaceGCSafePointSuffix()
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
safePoints := make([]*KeySpaceGCSafePoint, 0, len(values))
for i := range keys {
// skip non gc safe points
if !strings.HasSuffix(keys[i], suffix) {
continue
}
safePoint := &KeySpaceGCSafePoint{}
spaceID := strings.TrimPrefix(keys[i], prefix)
spaceID = strings.TrimSuffix(spaceID, suffix)
safePoint.SpaceID = spaceID
if withGCSafePoint {
value, err := strconv.ParseUint(values[i], 16, 64)
if err != nil {
return nil, err
}
safePoint.SafePoint = value
}
safePoints = append(safePoints, safePoint)
}
return safePoints, nil
}
32 changes: 32 additions & 0 deletions server/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
customScheduleConfigPath = "scheduler_config"
gcWorkerServiceSafePointID = "gc_worker"
minResolvedTS = "min_resolved_ts"
keySpaceSafePointPrefix = "key_space/gc_safepoint"
keySpaceGCSafePointSuffix = "gc"
)

// AppendToRootPath appends the given key to the rootPath.
Expand Down Expand Up @@ -104,3 +106,33 @@ func gcSafePointServicePath(serviceID string) string {
func MinResolvedTSPath() string {
return path.Join(clusterPath, minResolvedTS)
}

// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// Prefix: /key_space/gc_safepoint/{space_id}/service/
func KeySpaceServiceSafePointPrefix(spaceID string) string {
return path.Join(keySpaceSafePointPrefix, spaceID, "service") + "/"
}

// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
// Path: /key_space/gc_safepoint/{space_id}/gc
func KeySpaceGCSafePointPath(spaceID string) string {
return path.Join(keySpaceSafePointPrefix, spaceID, keySpaceGCSafePointSuffix)
}

// KeySpaceServiceSafePointPath returns the path of given service's service safe point.
// Path: /key_space/gc_safepoint/{space_id}/service/{service_id}
func KeySpaceServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID)
}

// KeySpaceSafePointPrefix returns prefix for all key-spaces' safe points.
// Path: /key_space/gc_safepoint/
func KeySpaceSafePointPrefix() string {
return keySpaceSafePointPrefix + "/"
}

// KeySpaceGCSafePointSuffix returns the suffix for any gc safepoint.
// Postfix: /gc
func KeySpaceGCSafePointSuffix() string {
return "/" + keySpaceGCSafePointSuffix
}
63 changes: 59 additions & 4 deletions server/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,24 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase {
}

func (kv *etcdKVBase) Load(key string) (string, error) {
value, _, err := kv.LoadRevision(key)
return value, err
}

// LoadRevision gets a value along with revision.
func (kv *etcdKVBase) LoadRevision(key string) (string, int64, error) {
key = path.Join(kv.rootPath, key)

resp, err := etcdutil.EtcdKVGet(kv.client, key)
if err != nil {
return "", err
return "", RevisionUnavailable, err
}
if n := len(resp.Kvs); n == 0 {
return "", nil
return "", RevisionUnavailable, nil
} else if n > 1 {
return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
return "", RevisionUnavailable, errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
}
return string(resp.Kvs[0].Value), nil
return string(resp.Kvs[0].Value), resp.Kvs[0].ModRevision, nil
}

func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
Expand Down Expand Up @@ -103,6 +109,55 @@ func (kv *etcdKVBase) Save(key, value string) error {
return nil
}

func (kv *etcdKVBase) GrantLease(ttlSeconds int64) (leaseID clientv3.LeaseID, err error) {
start := time.Now()
ctx, cancel := context.WithTimeout(kv.client.Ctx(), requestTimeout)
grantResp, err := kv.client.Grant(ctx, ttlSeconds)
cancel()
if err != nil {
e := errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause()
log.Error("grant lease meet error",
zap.Int64("ttl", ttlSeconds),
errs.ZapError(e))
return 0, e
}
if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lease grants too slow",
zap.Reflect("response", grantResp),
zap.Duration("cost", cost),
errs.ZapError(err))
}
log.Info("lease granted",
zap.Int64("lease-id", int64(grantResp.ID)),
zap.Int64("lease-timeout", ttlSeconds))
return grantResp.ID, nil
}

func (kv *etcdKVBase) SaveWithTTL(key, value string, ttlSeconds int64) error {
leaseID, err := kv.GrantLease(ttlSeconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use etcdutil.EtcdKVPutWithTTL(context.TODO(),kv.client,key,value,ttlSeconds) directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was trying to put SaveWithTTL into a SlowLogTxn like other update methods for timeout and logging. But I guess that created more complexity.

Should we skip timeout / logging for this operation like other read methods?

if err != nil {
return errs.ErrEtcdKVPut.Wrap(err).GenWithStackByCause()
}

key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID))).Commit()
if err != nil {
e := errs.ErrEtcdKVPut.Wrap(err).GenWithStackByCause()
log.Error("save to etcd with lease meet error",
zap.String("key", key),
zap.String("value", value),
zap.Int64("lease-id", int64(leaseID)),
errs.ZapError(e),
)
return e
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
return nil
}

func (kv *etcdKVBase) Remove(key string) error {
key = path.Join(kv.rootPath, key)

Expand Down
6 changes: 6 additions & 0 deletions server/storage/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@

package kv

// RevisionUnavailable is the value of unavailable revision,
// when the kv does not exist (etcd_kv), or is not supported (mem_kv & leveldb_kv).
const RevisionUnavailable = -1

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
LoadRevision(key string) (string, int64, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revision here is a very specific term that is related to etcd, is there a better way to avoid introducing it in this generic KV interface?

Save(key, value string) error
SaveWithTTL(key, value string, ttlSeconds int64) error
Remove(key string) error
}
11 changes: 11 additions & 0 deletions server/storage/kv/levedb_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func (kv *LevelDBKV) Load(key string) (string, error) {
return string(v), err
}

// LoadRevision gets a value along with revision. The revision is unavailable for `LevelDBKV`.
func (kv *LevelDBKV) LoadRevision(key string) (string, int64, error) {
value, err := kv.Load(key)
return value, RevisionUnavailable, err
}

// LoadRange gets a range of value for a given key range.
func (kv *LevelDBKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) {
iter := kv.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil)
Expand All @@ -72,6 +78,11 @@ func (kv *LevelDBKV) Save(key, value string) error {
return errors.WithStack(kv.Put([]byte(key), []byte(value), nil))
}

// SaveWithTTL not supported on LevelDBKV
func (kv *LevelDBKV) SaveWithTTL(key, value string, ttlSeconds int64) error {
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("ttl operation not supported on LevelDBKV")
}

// Remove deletes a key-value pair for a given key.
func (kv *LevelDBKV) Remove(key string) error {
return errors.WithStack(kv.Delete([]byte(key), nil))
Expand Down
11 changes: 11 additions & 0 deletions server/storage/kv/mem_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func (kv *memoryKV) Load(key string) (string, error) {
return item.(memoryKVItem).value, nil
}

// LoadRevision gets a value along with revision. The revision is unavailable for `memoryKV`.
func (kv *memoryKV) LoadRevision(key string) (string, int64, error) {
value, err := kv.Load(key)
return value, RevisionUnavailable, err
}

func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
failpoint.Inject("withRangeLimit", func(val failpoint.Value) {
rangeLimit, ok := val.(int)
Expand Down Expand Up @@ -80,6 +86,11 @@ func (kv *memoryKV) Save(key, value string) error {
return nil
}

// SaveWithTTL not supported on memoryKV
func (kv *memoryKV) SaveWithTTL(key, value string, ttlSeconds int64) error {
return errors.New("ttl operation not supported on memoryKV")
}

func (kv *memoryKV) Remove(key string) error {
kv.Lock()
defer kv.Unlock()
Expand Down
1 change: 1 addition & 0 deletions server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Storage interface {
endpoint.ReplicationStatusStorage
endpoint.GCSafePointStorage
endpoint.MinResolvedTSStorage
endpoint.KeySpaceGCSafePointStorage
}

// NewStorageWithMemoryBackend creates a new storage with memory backend.
Expand Down
Loading