diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 66bf876fe51..714ddac3ce4 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -29,10 +29,11 @@ import ( var ( LowTraffic = Config{ - Name: "LowTraffic", - minimalQPS: 100, - maximalQPS: 200, - clientCount: 8, + Name: "LowTraffic", + minimalQPS: 100, + maximalQPS: 200, + clientCount: 8, + maxNonUniqueRequestConcurrency: 3, Traffic: etcdTraffic{ keyCount: 10, leaseTTL: DefaultLeaseTTL, @@ -53,10 +54,11 @@ var ( }, } HighTraffic = Config{ - Name: "HighTraffic", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, + Name: "HighTraffic", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + maxNonUniqueRequestConcurrency: 3, Traffic: etcdTraffic{ keyCount: 10, largePutSize: 32769, @@ -102,7 +104,7 @@ const ( Defragment etcdRequestType = "defragment" ) -func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { +func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { lastOperationSucceeded := true var lastRev int64 var requestType etcdRequestType @@ -124,11 +126,18 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. } // Avoid multiple failed writes in a row if lastOperationSucceeded { - requestType = pickRandom(t.requests) + choices := t.requests + if !nonUniqueWriteLimiter.Take() { + choices = filterOutNonUniqueEtcdWrites(choices) + } + requestType = pickRandom(choices) } else { requestType = Get } rev, err := client.Request(ctx, requestType, lastRev) + if requestType == Delete || requestType == LeaseRevoke { + nonUniqueWriteLimiter.Return() + } lastOperationSucceeded = err == nil if err != nil { continue @@ -140,6 +149,15 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. } } +func filterOutNonUniqueEtcdWrites(choices []choiceWeight[etcdRequestType]) (resp []choiceWeight[etcdRequestType]) { + for _, choice := range choices { + if choice.choice != Delete && choice.choice != LeaseRevoke { + resp = append(resp, choice) + } + } + return resp +} + type etcdTrafficClient struct { etcdTraffic keyPrefix string diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 4e7319444e4..3d0251d8a81 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -32,10 +32,11 @@ import ( var ( KubernetesTraffic = Config{ - Name: "Kubernetes", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, + Name: "Kubernetes", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + maxNonUniqueRequestConcurrency: 3, Traffic: kubernetesTraffic{ averageKeyCount: 10, resource: "pods", @@ -60,7 +61,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool { return true } -func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { +func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { kc := &kubernetesClient{client: c} s := newStorage() keyPrefix := "/registry/" + t.resource + "/" @@ -99,7 +100,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter continue } } - err := t.Write(ctx, kc, ids, s, limiter) + err := t.Write(ctx, kc, ids, s, limiter, nonUniqueWriteLimiter) lastWriteFailed = err != nil if err != nil { continue @@ -140,7 +141,7 @@ func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *st return revision, nil } -func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter) (err error) { +func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) defer cancel() count := s.Count() @@ -151,13 +152,19 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids if rev == 0 { return errors.New("storage empty") } - if count > t.averageKeyCount*3/2 { + if count > t.averageKeyCount*3/2 && nonUniqueWriteLimiter.Take() { _, err = kc.OptimisticDelete(writeCtx, key, rev) + nonUniqueWriteLimiter.Return() } else { - op := pickRandom(t.writeChoices) + choices := t.writeChoices + if !nonUniqueWriteLimiter.Take() { + choices = filterOutNonUniqueKuberntesWrites(t.writeChoices) + } + op := pickRandom(choices) switch op { case KubernetesDelete: _, err = kc.OptimisticDelete(writeCtx, key, rev) + nonUniqueWriteLimiter.Return() case KubernetesUpdate: _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) case KubernetesCreate: @@ -174,6 +181,15 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids return nil } +func filterOutNonUniqueKuberntesWrites(choices []choiceWeight[KubernetesRequestType]) (resp []choiceWeight[KubernetesRequestType]) { + for _, choice := range choices { + if choice.choice != KubernetesDelete { + resp = append(resp, choice) + } + } + return resp +} + func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) { watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) defer cancel() diff --git a/tests/robustness/traffic/limiter.go b/tests/robustness/traffic/limiter.go new file mode 100644 index 00000000000..d5db965d5b7 --- /dev/null +++ b/tests/robustness/traffic/limiter.go @@ -0,0 +1,47 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traffic + +func NewConcurrencyLimiter(size int) ConcurrencyLimiter { + return &concurrencyLimiter{ + ch: make(chan struct{}, size), + } +} + +type ConcurrencyLimiter interface { + Take() bool + Return() +} + +type concurrencyLimiter struct { + ch chan struct{} +} + +func (c *concurrencyLimiter) Take() bool { + select { + case c.ch <- struct{}{}: + return true + default: + return false + } +} + +func (c *concurrencyLimiter) Return() { + select { + case _ = <-c.ch: + default: + panic("Call to Return() without a successful Take") + } +} diff --git a/tests/robustness/traffic/limiter_test.go b/tests/robustness/traffic/limiter_test.go new file mode 100644 index 00000000000..ef3ead7444d --- /dev/null +++ b/tests/robustness/traffic/limiter_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traffic + +import ( + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" +) + +func TestLimiter(t *testing.T) { + limiter := NewConcurrencyLimiter(3) + counter := &atomic.Int64{} + g := errgroup.Group{} + for i := 0; i < 10; i++ { + g.Go(func() error { + if limiter.Take() { + counter.Add(1) + } + return nil + }) + } + g.Wait() + assert.Equal(t, 3, int(counter.Load())) + assert.False(t, limiter.Take()) + + limiter.Return() + counter.Store(0) + for i := 0; i < 10; i++ { + g.Go(func() error { + if limiter.Take() { + counter.Add(1) + } + return nil + }) + } + g.Wait() + assert.Equal(t, 1, int(counter.Load())) + assert.False(t, limiter.Take()) + + limiter.Return() + limiter.Return() + limiter.Return() + assert.Panics(t, func() { + limiter.Return() + }) +} diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index a257d6d647f..54c3b02a9fd 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -51,6 +51,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } defer cc.Close() wg := sync.WaitGroup{} + nonUniqueWriteLimiter := NewConcurrencyLimiter(config.maxNonUniqueRequestConcurrency) for i := 0; i < config.clientCount; i++ { wg.Add(1) c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) @@ -61,7 +62,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 defer wg.Done() defer c.Close() - config.Traffic.Run(ctx, c, limiter, ids, lm, finish) + config.Traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish) mux.Lock() reports = append(reports, c.Report()) mux.Unlock() @@ -93,14 +94,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } type Config struct { - Name string - minimalQPS float64 - maximalQPS float64 - clientCount int - Traffic Traffic + Name string + minimalQPS float64 + maximalQPS float64 + maxNonUniqueRequestConcurrency int + clientCount int + Traffic Traffic } type Traffic interface { - Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) + Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) ExpectUniqueRevision() bool }