Skip to content

Commit

Permalink
tests/robustness: Prevent to many concurrent non-unique writes which …
Browse files Browse the repository at this point in the history
…are causing linearization to timeout

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jul 3, 2023
1 parent 57a583d commit 8fca6eb
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 26 deletions.
38 changes: 28 additions & 10 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (

var (
LowTraffic = Config{
Name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
Name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
maxNonUniqueRequestConcurrency: 3,
Traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
Expand All @@ -53,10 +54,11 @@ var (
},
}
HighTraffic = Config{
Name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
Name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
maxNonUniqueRequestConcurrency: 3,
Traffic: etcdTraffic{
keyCount: 10,
largePutSize: 32769,
Expand Down Expand Up @@ -102,7 +104,7 @@ const (
Defragment etcdRequestType = "defragment"
)

func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
lastOperationSucceeded := true
var lastRev int64
var requestType etcdRequestType
Expand All @@ -124,11 +126,18 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
}
// Avoid multiple failed writes in a row
if lastOperationSucceeded {
requestType = pickRandom(t.requests)
choices := t.requests
if !nonUniqueWriteLimiter.Take() {
choices = filterOutNonUniqueEtcdWrites(choices)
}
requestType = pickRandom(choices)
} else {
requestType = Get
}
rev, err := client.Request(ctx, requestType, lastRev)
if requestType == Delete || requestType == LeaseRevoke {
nonUniqueWriteLimiter.Return()
}
lastOperationSucceeded = err == nil
if err != nil {
continue
Expand All @@ -140,6 +149,15 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
}
}

func filterOutNonUniqueEtcdWrites(choices []choiceWeight[etcdRequestType]) (resp []choiceWeight[etcdRequestType]) {
for _, choice := range choices {
if choice.choice != Delete && choice.choice != LeaseRevoke {
resp = append(resp, choice)
}
}
return resp
}

type etcdTrafficClient struct {
etcdTraffic
keyPrefix string
Expand Down
34 changes: 25 additions & 9 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (

var (
KubernetesTraffic = Config{
Name: "Kubernetes",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
Name: "Kubernetes",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
maxNonUniqueRequestConcurrency: 3,
Traffic: kubernetesTraffic{
averageKeyCount: 10,
resource: "pods",
Expand All @@ -60,7 +61,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return true
}

func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
kc := &kubernetesClient{client: c}
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
continue
}
}
err := t.Write(ctx, kc, ids, s, limiter)
err := t.Write(ctx, kc, ids, s, limiter, nonUniqueWriteLimiter)
lastWriteFailed = err != nil
if err != nil {
continue
Expand Down Expand Up @@ -140,7 +141,7 @@ func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *st
return revision, nil
}

func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter) (err error) {
func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()
count := s.Count()
Expand All @@ -151,13 +152,19 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
if rev == 0 {
return errors.New("storage empty")
}
if count > t.averageKeyCount*3/2 {
if count > t.averageKeyCount*3/2 && nonUniqueWriteLimiter.Take() {
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
} else {
op := pickRandom(t.writeChoices)
choices := t.writeChoices
if !nonUniqueWriteLimiter.Take() {
choices = filterOutNonUniqueKuberntesWrites(t.writeChoices)
}
op := pickRandom(choices)
switch op {
case KubernetesDelete:
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
case KubernetesCreate:
Expand All @@ -174,6 +181,15 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
return nil
}

func filterOutNonUniqueKuberntesWrites(choices []choiceWeight[KubernetesRequestType]) (resp []choiceWeight[KubernetesRequestType]) {
for _, choice := range choices {
if choice.choice != KubernetesDelete {
resp = append(resp, choice)
}
}
return resp
}

func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) {
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
defer cancel()
Expand Down
47 changes: 47 additions & 0 deletions tests/robustness/traffic/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package traffic

func NewConcurrencyLimiter(size int) ConcurrencyLimiter {
return &concurrencyLimiter{
ch: make(chan struct{}, size),
}
}

type ConcurrencyLimiter interface {
Take() bool
Return()
}

type concurrencyLimiter struct {
ch chan struct{}
}

func (c *concurrencyLimiter) Take() bool {
select {
case c.ch <- struct{}{}:
return true
default:
return false
}
}

func (c *concurrencyLimiter) Return() {
select {
case _ = <-c.ch:
default:
panic("Call to Return() without a successful Take")
}
}
61 changes: 61 additions & 0 deletions tests/robustness/traffic/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package traffic

import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)

func TestLimiter(t *testing.T) {
limiter := NewConcurrencyLimiter(3)
counter := &atomic.Int64{}
g := errgroup.Group{}
for i := 0; i < 10; i++ {
g.Go(func() error {
if limiter.Take() {
counter.Add(1)
}
return nil
})
}
g.Wait()
assert.Equal(t, 3, int(counter.Load()))
assert.False(t, limiter.Take())

limiter.Return()
counter.Store(0)
for i := 0; i < 10; i++ {
g.Go(func() error {
if limiter.Take() {
counter.Add(1)
}
return nil
})
}
g.Wait()
assert.Equal(t, 1, int(counter.Load()))
assert.False(t, limiter.Take())

limiter.Return()
limiter.Return()
limiter.Return()
assert.Panics(t, func() {
limiter.Return()
})
}
16 changes: 9 additions & 7 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
defer cc.Close()
wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(config.maxNonUniqueRequestConcurrency)
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
Expand All @@ -61,7 +62,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done()
defer c.Close()

config.Traffic.Run(ctx, c, limiter, ids, lm, finish)
config.Traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
mux.Lock()
reports = append(reports, c.Report())
mux.Unlock()
Expand Down Expand Up @@ -93,14 +94,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}

type Config struct {
Name string
minimalQPS float64
maximalQPS float64
clientCount int
Traffic Traffic
Name string
minimalQPS float64
maximalQPS float64
maxNonUniqueRequestConcurrency int
clientCount int
Traffic Traffic
}

type Traffic interface {
Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
ExpectUniqueRevision() bool
}

0 comments on commit 8fca6eb

Please sign in to comment.