Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
# Temporary output of build tools
bazel-*
*.out

# Repomix outputs
repomix*.xml

19 changes: 15 additions & 4 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,13 @@ func (p *podTracker) Capacity() int {
if p.b == nil {
return 1
}
return p.b.Capacity()
capacity := p.b.Capacity()
// Safe conversion: breaker capacity is always reasonable for int
// Check for overflow before conversion
if capacity > 0x7FFFFFFF {
return 0x7FFFFFFF // Return max int32 value
}
return int(capacity)
}

func (p *podTracker) UpdateConcurrency(c int) {
Expand All @@ -118,7 +124,7 @@ func (p *podTracker) Reserve(ctx context.Context) (func(), bool) {
}

type breaker interface {
Capacity() int
Capacity() uint64
Maybe(ctx context.Context, thunk func()) error
UpdateConcurrency(int)
Reserve(ctx context.Context) (func(), bool)
Expand Down Expand Up @@ -721,8 +727,13 @@ func newInfiniteBreaker(logger *zap.SugaredLogger) *infiniteBreaker {
}

// Capacity returns the current capacity of the breaker
func (ib *infiniteBreaker) Capacity() int {
return int(ib.concurrency.Load())
func (ib *infiniteBreaker) Capacity() uint64 {
concurrency := ib.concurrency.Load()
// Safe conversion: concurrency is int32 and we check for non-negative
if concurrency >= 0 {
return uint64(concurrency)
}
return 0
}

func zeroOrOne(x int) int32 {
Expand Down
20 changes: 10 additions & 10 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestThrottlerUpdateCapacity(t *testing.T) {
rt.breaker = newInfiniteBreaker(logger)
}
rt.updateCapacity(tt.capacity)
if got := rt.breaker.Capacity(); got != tt.want {
if got := rt.breaker.Capacity(); got != uint64(tt.want) {
t.Errorf("Capacity = %d, want: %d", got, tt.want)
}
if tt.checkAssignedPod {
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestThrottlerSuccesses(t *testing.T) {
rt.mux.RLock()
defer rt.mux.RUnlock()
if *cc != 0 {
return rt.activatorIndex.Load() != -1 && rt.breaker.Capacity() == wantCapacity &&
return rt.activatorIndex.Load() != -1 && rt.breaker.Capacity() == uint64(wantCapacity) &&
sortedTrackers(rt.assignedTrackers), nil
}
// If CC=0 then verify number of backends, rather the capacity of breaker.
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestPodAssignmentFinite(t *testing.T) {
if got, want := trackerDestSet(rt.assignedTrackers), sets.New("ip0", "ip4"); !got.Equal(want) {
t.Errorf("Assigned trackers = %v, want: %v, diff: %s", got, want, cmp.Diff(want, got))
}
if got, want := rt.breaker.Capacity(), 2*42; got != want {
if got, want := rt.breaker.Capacity(), uint64(2*42); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
if got, want := rt.assignedTrackers[0].Capacity(), 42; got != want {
Expand All @@ -657,7 +657,7 @@ func TestPodAssignmentFinite(t *testing.T) {
if got, want := len(rt.assignedTrackers), 0; got != want {
t.Errorf("NumAssignedTrackers = %d, want: %d", got, want)
}
if got, want := rt.breaker.Capacity(), 0; got != want {
if got, want := rt.breaker.Capacity(), uint64(0); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
}
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestPodAssignmentInfinite(t *testing.T) {
if got, want := len(rt.assignedTrackers), 3; got != want {
t.Errorf("NumAssigned trackers = %d, want: %d", got, want)
}
if got, want := rt.breaker.Capacity(), 1; got != want {
if got, want := rt.breaker.Capacity(), uint64(1); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
if got, want := rt.assignedTrackers[0].Capacity(), 1; got != want {
Expand All @@ -703,7 +703,7 @@ func TestPodAssignmentInfinite(t *testing.T) {
if got, want := len(rt.assignedTrackers), 0; got != want {
t.Errorf("NumAssignedTrackers = %d, want: %d", got, want)
}
if got, want := rt.breaker.Capacity(), 0; got != want {
if got, want := rt.breaker.Capacity(), uint64(0); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func TestInfiniteBreaker(t *testing.T) {
}

// Verify initial condition.
if got, want := b.Capacity(), 0; got != want {
if got, want := b.Capacity(), uint64(0); got != want {
t.Errorf("Cap=%d, want: %d", got, want)
}
if _, ok := b.Reserve(context.Background()); ok != true {
Expand All @@ -949,7 +949,7 @@ func TestInfiniteBreaker(t *testing.T) {
}

b.UpdateConcurrency(1)
if got, want := b.Capacity(), 1; got != want {
if got, want := b.Capacity(), uint64(1); got != want {
t.Errorf("Cap=%d, want: %d", got, want)
}

Expand All @@ -976,7 +976,7 @@ func TestInfiniteBreaker(t *testing.T) {
if err := b.Maybe(ctx, nil); err == nil {
t.Error("Should have failed, but didn't")
}
if got, want := b.Capacity(), 0; got != want {
if got, want := b.Capacity(), uint64(0); got != want {
t.Errorf("Cap=%d, want: %d", got, want)
}

Expand Down Expand Up @@ -1212,7 +1212,7 @@ func TestAssignSlice(t *testing.T) {
t.Errorf("Got=%v, want: %v; diff: %s", got, want,
cmp.Diff(want, got, opt))
}
if got, want := got[0].b.Capacity(), 0; got != want {
if got, want := got[0].b.Capacity(), uint64(0); got != want {
t.Errorf("Capacity for the tail pod = %d, want: %d", got, want)
}
})
Expand Down
36 changes: 24 additions & 12 deletions pkg/queue/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type BreakerParams struct {
// executions in excess of the concurrency limit. Function call attempts
// beyond the limit of the queue are failed immediately.
type Breaker struct {
inFlight atomic.Int64
pending atomic.Int64
totalSlots int64
sem *semaphore

Expand Down Expand Up @@ -83,10 +83,10 @@ func NewBreaker(params BreakerParams) *Breaker {
func (b *Breaker) tryAcquirePending() bool {
// This is an atomic version of:
//
// if inFlight == totalSlots {
// if pending == totalSlots {
// return false
// } else {
// inFlight++
// pending++
// return true
// }
//
Expand All @@ -96,19 +96,20 @@ func (b *Breaker) tryAcquirePending() bool {
// (it fails if we're raced to it) or if we don't fulfill the condition
// anymore.
for {
cur := b.inFlight.Load()
cur := b.pending.Load()
// 10000 + containerConcurrency = totalSlots
if cur == b.totalSlots {
return false
}
if b.inFlight.CompareAndSwap(cur, cur+1) {
if b.pending.CompareAndSwap(cur, cur+1) {
return true
}
}
}

// releasePending releases a slot on the pending "queue".
func (b *Breaker) releasePending() {
b.inFlight.Add(-1)
b.pending.Add(-1)
}

// Reserve reserves an execution slot in the breaker, to permit
Expand Down Expand Up @@ -154,9 +155,9 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) error {
return nil
}

// InFlight returns the number of requests currently in flight in this breaker.
func (b *Breaker) InFlight() int {
return int(b.inFlight.Load())
// Pending returns the number of requests currently pending to this breaker.
func (b *Breaker) Pending() int {
return int(b.pending.Load())
}

// UpdateConcurrency updates the maximum number of in-flight requests.
Expand All @@ -165,10 +166,15 @@ func (b *Breaker) UpdateConcurrency(size int) {
}

// Capacity returns the number of allowed in-flight requests on this breaker.
func (b *Breaker) Capacity() int {
func (b *Breaker) Capacity() uint64 {
return b.sem.Capacity()
}

// InFlight returns the number of requests currently in-flight on this breaker.
func (b *Breaker) InFlight() uint64 {
return b.sem.InFlight()
}

// newSemaphore creates a semaphore with the desired initial capacity.
func newSemaphore(maxCapacity, initialCapacity int) *semaphore {
queue := make(chan struct{}, maxCapacity)
Expand Down Expand Up @@ -288,9 +294,15 @@ func (s *semaphore) updateCapacity(size int) {
}

// Capacity is the capacity of the semaphore.
func (s *semaphore) Capacity() int {
func (s *semaphore) Capacity() uint64 {
capacity, _ := unpack(s.state.Load())
return int(capacity) //nolint:gosec // TODO(dprotaso) - capacity should be uint64
return capacity
}

// InFlight is the number of the inflight requests of the semaphore.
func (s *semaphore) InFlight() uint64 {
_, inFlight := unpack(s.state.Load())
return inFlight
}

// unpack takes an uint64 and returns two uint32 (as uint64) comprised of the leftmost
Expand Down
8 changes: 4 additions & 4 deletions pkg/queue/breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ func TestBreakerUpdateConcurrency(t *testing.T) {
params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 0}
b := NewBreaker(params)
b.UpdateConcurrency(1)
if got, want := b.Capacity(), 1; got != want {
if got, want := b.Capacity(), uint64(1); got != want {
t.Errorf("Capacity() = %d, want: %d", got, want)
}

b.UpdateConcurrency(0)
if got, want := b.Capacity(), 0; got != want {
if got, want := b.Capacity(), uint64(0); got != want {
t.Errorf("Capacity() = %d, want: %d", got, want)
}
}
Expand Down Expand Up @@ -294,12 +294,12 @@ func TestSemaphoreRelease(t *testing.T) {
func TestSemaphoreUpdateCapacity(t *testing.T) {
const initialCapacity = 1
sem := newSemaphore(3, initialCapacity)
if got, want := sem.Capacity(), 1; got != want {
if got, want := sem.Capacity(), uint64(1); got != want {
t.Errorf("Capacity = %d, want: %d", got, want)
}
sem.acquire(context.Background())
sem.updateCapacity(initialCapacity + 2)
if got, want := sem.Capacity(), 3; got != want {
if got, want := sem.Capacity(), uint64(3); got != want {
t.Errorf("Capacity = %d, want: %d", got, want)
}
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/queue/drain/signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package drain

const (
// SignalDirectory is the directory where drain signal files are created
SignalDirectory = "/var/run/knative"

// DrainStartedFile indicates that pod termination has begun and queue-proxy is handling shutdown
DrainStartedFile = SignalDirectory + "/drain-started"

// DrainCompleteFile indicates that queue-proxy has finished draining requests
DrainCompleteFile = SignalDirectory + "/drain-complete"

// DrainCheckInterval is how often to check for drain completion
DrainCheckInterval = "1" // seconds

// ExponentialBackoffDelays are the delays in seconds for checking drain-started file
// Total max wait time: 1+2+4+8 = 15 seconds
ExponentialBackoffDelays = "1 2 4 8"
)

// BuildDrainWaitScript generates the shell script for waiting on drain signals.
// If existingCommand is provided, it will be executed before the drain wait.
func BuildDrainWaitScript(existingCommand string) string {
drainLogic := `for i in ` + ExponentialBackoffDelays + `; do ` +
` if [ -f ` + DrainStartedFile + ` ]; then ` +
` until [ -f ` + DrainCompleteFile + ` ]; do sleep ` + DrainCheckInterval + `; done; ` +
` exit 0; ` +
` fi; ` +
` sleep $i; ` +
`done; ` +
`exit 0`

if existingCommand != "" {
return existingCommand + "; " + drainLogic
}
return drainLogic
}

// QueueProxyPreStopScript is the script executed by queue-proxy's PreStop hook
const QueueProxyPreStopScript = "touch " + DrainStartedFile
Loading
Loading