Skip to content

Commit

Permalink
Merge pull request #16122 from serathius/robustness-etcd-range
Browse files Browse the repository at this point in the history
tests/robustness: Add List and StaleList requests to etcd traffic
  • Loading branch information
serathius authored Jun 21, 2023
2 parents 66553d4 + 9fc438c commit 54fab1c
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ var (
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
requests: []choiceWeight[etcdRequestType]{
{choice: Get, weight: 25},
{choice: StaleGet, weight: 25},
{choice: Get, weight: 15},
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 23},
{choice: LargePut, weight: 2},
{choice: Delete, weight: 5},
Expand All @@ -60,8 +62,10 @@ var (
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
requests: []choiceWeight[etcdRequestType]{
{choice: Get, weight: 25},
{choice: StaleGet, weight: 25},
{choice: Get, weight: 15},
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5},
Expand All @@ -86,6 +90,8 @@ type etcdRequestType string
const (
Get etcdRequestType = "get"
StaleGet etcdRequestType = "staleGet"
List etcdRequestType = "list"
StaleList etcdRequestType = "staleList"
Put etcdRequestType = "put"
LargePut etcdRequestType = "largePut"
Delete etcdRequestType = "delete"
Expand All @@ -102,6 +108,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
var requestType etcdRequestType
client := etcdTrafficClient{
etcdTraffic: t,
keyPrefix: "key",
client: c,
limiter: limiter,
idProvider: ids,
Expand All @@ -115,14 +122,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
// Avoid multiple failed writes in a row
if lastOperationSucceeded {
requestType = pickRandom(t.requests)
} else {
requestType = Get
}
rev, err := client.Request(ctx, requestType, key, lastRev)
rev, err := client.Request(ctx, requestType, lastRev)
lastOperationSucceeded = err == nil
if err != nil {
continue
Expand All @@ -136,35 +142,49 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.

type etcdTrafficClient struct {
etcdTraffic
keyPrefix string
client *RecordingClient
limiter *rate.Limiter
idProvider identity.Provider
leaseStorage identity.LeaseIdStorage
}

func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) {
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, lastRev int64) (rev int64, err error) {
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()

switch request {
case StaleGet:
_, rev, err = c.client.Get(opCtx, key, lastRev)
_, rev, err = c.client.Get(opCtx, c.randomKey(), lastRev)
case Get:
_, rev, err = c.client.Get(opCtx, key, 0)
_, rev, err = c.client.Get(opCtx, c.randomKey(), 0)
case List:
var resp *clientv3.GetResponse
resp, err = c.client.Range(opCtx, c.keyPrefix, true, 0)
if resp != nil {
rev = resp.Header.Revision
}
case StaleList:
var resp *clientv3.GetResponse
resp, err = c.client.Range(opCtx, c.keyPrefix, true, lastRev)
if resp != nil {
rev = resp.Header.Revision
}
case Put:
var resp *clientv3.PutResponse
resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))
resp, err = c.client.Put(opCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestId()))
if resp != nil {
rev = resp.Header.Revision
}
case LargePut:
var resp *clientv3.PutResponse
resp, err = c.client.Put(opCtx, key, randString(c.largePutSize))
resp, err = c.client.Put(opCtx, c.randomKey(), randString(c.largePutSize))
if resp != nil {
rev = resp.Header.Revision
}
case Delete:
var resp *clientv3.DeleteResponse
resp, err = c.client.Delete(opCtx, key)
resp, err = c.client.Delete(opCtx, c.randomKey())
if resp != nil {
rev = resp.Header.Revision
}
Expand All @@ -174,9 +194,9 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if resp != nil {
rev = resp.Header.Revision
}

case CompareAndSet:
var kv *mvccpb.KeyValue
key := c.randomKey()
kv, rev, err = c.client.Get(opCtx, key, 0)
if err == nil {
c.limiter.Wait(ctx)
Expand Down Expand Up @@ -209,7 +229,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.PutResponse
resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
resp, err = c.client.PutWithLease(putCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
putCancel()
if resp != nil {
rev = resp.Header.Revision
Expand Down Expand Up @@ -237,7 +257,6 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
default:
panic("invalid choice")
}
cancel()
return rev, err
}

Expand Down Expand Up @@ -274,6 +293,10 @@ func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
return ops
}

func (c etcdTrafficClient) randomKey() string {
return fmt.Sprintf("%s%d", c.keyPrefix, rand.Int()%c.keyCount)
}

func (t etcdTraffic) pickOperationType() model.OperationType {
roll := rand.Int() % 100
if roll < 10 {
Expand Down

0 comments on commit 54fab1c

Please sign in to comment.