Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@
# Temporary output of build tools
bazel-*
*.out

# Repomix outputs
repomix*.xml
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
golang.org/x/sys v0.36.0
golang.org/x/time v0.10.0
google.golang.org/api v0.198.0
google.golang.org/grpc v1.74.2
google.golang.org/grpc v1.75.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.33.4
k8s.io/apiextensions-apiserver v0.33.4
Expand Down Expand Up @@ -153,8 +153,8 @@ require (
golang.org/x/text v0.29.0 // indirect
golang.org/x/tools v0.37.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
15 changes: 8 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,9 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.5.0 h1:JELs8RLM12qJGXU4u/TO3V25KW8GreMKl9pdkk14RM0=
gomodules.xyz/jsonpatch/v2 v2.5.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
google.golang.org/api v0.198.0 h1:OOH5fZatk57iN0A7tjJQzt6aPfYQ1JiWkt1yGseazks=
google.golang.org/api v0.198.0/go.mod h1:/Lblzl3/Xqqk9hw/yS97TImKTUwnf1bv89v7+OagJzc=
Expand All @@ -517,17 +518,17 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 h1:oWVWY3NzT7KJppx2UKhKmzPq4SRe0LdCijVRwvGeikY=
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822/go.mod h1:h3c4v36UTKzUiuaOKQ6gr3S+0hovBtUrXzTG/i3+XEc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 h1:FiusG7LWj+4byqhbvmB+Q93B/mOxJLN2DTozDuZm4EU=
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c h1:qXWI/sQtv5UKboZ/zUk7h+mrf/lXORyI+n9DKDAusdg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
19 changes: 15 additions & 4 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,13 @@ func (p *podTracker) Capacity() int {
if p.b == nil {
return 1
}
return p.b.Capacity()
capacity := p.b.Capacity()
// Safe conversion: breaker capacity is always reasonable for int
// Check for overflow before conversion
if capacity > 0x7FFFFFFF {
return 0x7FFFFFFF // Return max int32 value
}
Comment on lines +106 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support 32bit architectures - so int == int64

return int(capacity)
}

func (p *podTracker) UpdateConcurrency(c int) {
Expand All @@ -118,7 +124,7 @@ func (p *podTracker) Reserve(ctx context.Context) (func(), bool) {
}

type breaker interface {
Capacity() int
Capacity() uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything motivating this change? This seems unrelated to web socket drain?

I've also tried something like this before but you end up with a lot of casting that I didn't feel like it was worth it because int==int64 on the arch's we support

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I just changed it because of a TODO comment for changing the type, figured I would just get it done. I can revert if you feel as though it is not worth the trouble

Maybe(ctx context.Context, thunk func()) error
UpdateConcurrency(int)
Reserve(ctx context.Context) (func(), bool)
Expand Down Expand Up @@ -721,8 +727,13 @@ func newInfiniteBreaker(logger *zap.SugaredLogger) *infiniteBreaker {
}

// Capacity returns the current capacity of the breaker
func (ib *infiniteBreaker) Capacity() int {
return int(ib.concurrency.Load())
func (ib *infiniteBreaker) Capacity() uint64 {
concurrency := ib.concurrency.Load()
// Safe conversion: concurrency is int32 and we check for non-negative
if concurrency >= 0 {
return uint64(concurrency)
}
return 0
Comment on lines +731 to +736
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infinite breaker is only really 0 or 1 so I don't think we need this extra conversion check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linter was complaining which is why I did this.

}

func zeroOrOne(x int) int32 {
Expand Down
20 changes: 10 additions & 10 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestThrottlerUpdateCapacity(t *testing.T) {
rt.breaker = newInfiniteBreaker(logger)
}
rt.updateCapacity(tt.capacity)
if got := rt.breaker.Capacity(); got != tt.want {
if got := rt.breaker.Capacity(); got != uint64(tt.want) {
t.Errorf("Capacity = %d, want: %d", got, tt.want)
}
if tt.checkAssignedPod {
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestThrottlerSuccesses(t *testing.T) {
rt.mux.RLock()
defer rt.mux.RUnlock()
if *cc != 0 {
return rt.activatorIndex.Load() != -1 && rt.breaker.Capacity() == wantCapacity &&
return rt.activatorIndex.Load() != -1 && rt.breaker.Capacity() == uint64(wantCapacity) &&
sortedTrackers(rt.assignedTrackers), nil
}
// If CC=0 then verify number of backends, rather the capacity of breaker.
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestPodAssignmentFinite(t *testing.T) {
if got, want := trackerDestSet(rt.assignedTrackers), sets.New("ip0", "ip4"); !got.Equal(want) {
t.Errorf("Assigned trackers = %v, want: %v, diff: %s", got, want, cmp.Diff(want, got))
}
if got, want := rt.breaker.Capacity(), 2*42; got != want {
if got, want := rt.breaker.Capacity(), uint64(2*42); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
if got, want := rt.assignedTrackers[0].Capacity(), 42; got != want {
Expand All @@ -657,7 +657,7 @@ func TestPodAssignmentFinite(t *testing.T) {
if got, want := len(rt.assignedTrackers), 0; got != want {
t.Errorf("NumAssignedTrackers = %d, want: %d", got, want)
}
if got, want := rt.breaker.Capacity(), 0; got != want {
if got, want := rt.breaker.Capacity(), uint64(0); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
}
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestPodAssignmentInfinite(t *testing.T) {
if got, want := len(rt.assignedTrackers), 3; got != want {
t.Errorf("NumAssigned trackers = %d, want: %d", got, want)
}
if got, want := rt.breaker.Capacity(), 1; got != want {
if got, want := rt.breaker.Capacity(), uint64(1); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
if got, want := rt.assignedTrackers[0].Capacity(), 1; got != want {
Expand All @@ -703,7 +703,7 @@ func TestPodAssignmentInfinite(t *testing.T) {
if got, want := len(rt.assignedTrackers), 0; got != want {
t.Errorf("NumAssignedTrackers = %d, want: %d", got, want)
}
if got, want := rt.breaker.Capacity(), 0; got != want {
if got, want := rt.breaker.Capacity(), uint64(0); got != want {
t.Errorf("TotalCapacity = %d, want: %d", got, want)
}
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func TestInfiniteBreaker(t *testing.T) {
}

// Verify initial condition.
if got, want := b.Capacity(), 0; got != want {
if got, want := b.Capacity(), uint64(0); got != want {
t.Errorf("Cap=%d, want: %d", got, want)
}
if _, ok := b.Reserve(context.Background()); ok != true {
Expand All @@ -949,7 +949,7 @@ func TestInfiniteBreaker(t *testing.T) {
}

b.UpdateConcurrency(1)
if got, want := b.Capacity(), 1; got != want {
if got, want := b.Capacity(), uint64(1); got != want {
t.Errorf("Cap=%d, want: %d", got, want)
}

Expand All @@ -976,7 +976,7 @@ func TestInfiniteBreaker(t *testing.T) {
if err := b.Maybe(ctx, nil); err == nil {
t.Error("Should have failed, but didn't")
}
if got, want := b.Capacity(), 0; got != want {
if got, want := b.Capacity(), uint64(0); got != want {
t.Errorf("Cap=%d, want: %d", got, want)
}

Expand Down Expand Up @@ -1212,7 +1212,7 @@ func TestAssignSlice(t *testing.T) {
t.Errorf("Got=%v, want: %v; diff: %s", got, want,
cmp.Diff(want, got, opt))
}
if got, want := got[0].b.Capacity(), 0; got != want {
if got, want := got[0].b.Capacity(), uint64(0); got != want {
t.Errorf("Capacity for the tail pod = %d, want: %d", got, want)
}
})
Expand Down
36 changes: 24 additions & 12 deletions pkg/queue/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type BreakerParams struct {
// executions in excess of the concurrency limit. Function call attempts
// beyond the limit of the queue are failed immediately.
type Breaker struct {
inFlight atomic.Int64
pending atomic.Int64
totalSlots int64
sem *semaphore

Expand Down Expand Up @@ -83,10 +83,10 @@ func NewBreaker(params BreakerParams) *Breaker {
func (b *Breaker) tryAcquirePending() bool {
// This is an atomic version of:
//
// if inFlight == totalSlots {
// if pending == totalSlots {
// return false
// } else {
// inFlight++
// pending++
// return true
// }
//
Expand All @@ -96,19 +96,20 @@ func (b *Breaker) tryAcquirePending() bool {
// (it fails if we're raced to it) or if we don't fulfill the condition
// anymore.
for {
cur := b.inFlight.Load()
cur := b.pending.Load()
// 10000 + containerConcurrency = totalSlots
if cur == b.totalSlots {
return false
}
if b.inFlight.CompareAndSwap(cur, cur+1) {
if b.pending.CompareAndSwap(cur, cur+1) {
return true
}
}
}

// releasePending releases a slot on the pending "queue".
func (b *Breaker) releasePending() {
b.inFlight.Add(-1)
b.pending.Add(-1)
}

// Reserve reserves an execution slot in the breaker, to permit
Expand Down Expand Up @@ -154,9 +155,9 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) error {
return nil
}

// InFlight returns the number of requests currently in flight in this breaker.
func (b *Breaker) InFlight() int {
return int(b.inFlight.Load())
// Pending returns the number of requests currently pending to this breaker.
func (b *Breaker) Pending() int {
return int(b.pending.Load())
}

// UpdateConcurrency updates the maximum number of in-flight requests.
Expand All @@ -165,10 +166,15 @@ func (b *Breaker) UpdateConcurrency(size int) {
}

// Capacity returns the number of allowed in-flight requests on this breaker.
func (b *Breaker) Capacity() int {
func (b *Breaker) Capacity() uint64 {
return b.sem.Capacity()
}

// InFlight returns the number of requests currently in-flight on this breaker.
func (b *Breaker) InFlight() uint64 {
return b.sem.InFlight()
}

// newSemaphore creates a semaphore with the desired initial capacity.
func newSemaphore(maxCapacity, initialCapacity int) *semaphore {
queue := make(chan struct{}, maxCapacity)
Expand Down Expand Up @@ -288,9 +294,15 @@ func (s *semaphore) updateCapacity(size int) {
}

// Capacity is the capacity of the semaphore.
func (s *semaphore) Capacity() int {
func (s *semaphore) Capacity() uint64 {
capacity, _ := unpack(s.state.Load())
return int(capacity) //nolint:gosec // TODO(dprotaso) - capacity should be uint64
return capacity
}

// InFlight is the number of the inflight requests of the semaphore.
func (s *semaphore) InFlight() uint64 {
_, inFlight := unpack(s.state.Load())
return inFlight
}

// unpack takes an uint64 and returns two uint32 (as uint64) comprised of the leftmost
Expand Down
8 changes: 4 additions & 4 deletions pkg/queue/breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ func TestBreakerUpdateConcurrency(t *testing.T) {
params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 0}
b := NewBreaker(params)
b.UpdateConcurrency(1)
if got, want := b.Capacity(), 1; got != want {
if got, want := b.Capacity(), uint64(1); got != want {
t.Errorf("Capacity() = %d, want: %d", got, want)
}

b.UpdateConcurrency(0)
if got, want := b.Capacity(), 0; got != want {
if got, want := b.Capacity(), uint64(0); got != want {
t.Errorf("Capacity() = %d, want: %d", got, want)
}
}
Expand Down Expand Up @@ -294,12 +294,12 @@ func TestSemaphoreRelease(t *testing.T) {
func TestSemaphoreUpdateCapacity(t *testing.T) {
const initialCapacity = 1
sem := newSemaphore(3, initialCapacity)
if got, want := sem.Capacity(), 1; got != want {
if got, want := sem.Capacity(), uint64(1); got != want {
t.Errorf("Capacity = %d, want: %d", got, want)
}
sem.acquire(context.Background())
sem.updateCapacity(initialCapacity + 2)
if got, want := sem.Capacity(), 3; got != want {
if got, want := sem.Capacity(), uint64(3); got != want {
t.Errorf("Capacity = %d, want: %d", got, want)
}
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/queue/drain/signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package drain

const (
// SignalDirectory is the directory where drain signal files are created
SignalDirectory = "/var/run/knative"

// DrainStartedFile indicates that pod termination has begun and queue-proxy is handling shutdown
DrainStartedFile = SignalDirectory + "/drain-started"

// DrainCompleteFile indicates that queue-proxy has finished draining requests
DrainCompleteFile = SignalDirectory + "/drain-complete"

// DrainCheckInterval is how often to check for drain completion
DrainCheckInterval = "1" // seconds

// ExponentialBackoffDelays are the delays in seconds for checking drain-started file
// Total max wait time: 1+2+4+8 = 15 seconds
ExponentialBackoffDelays = "1 2 4 8"
)

// BuildDrainWaitScript generates the shell script for waiting on drain signals.
// If existingCommand is provided, it will be executed before the drain wait.
func BuildDrainWaitScript(existingCommand string) string {
drainLogic := `for i in ` + ExponentialBackoffDelays + `; do ` +
` if [ -f ` + DrainStartedFile + ` ]; then ` +
` until [ -f ` + DrainCompleteFile + ` ]; do sleep ` + DrainCheckInterval + `; done; ` +
` exit 0; ` +
` fi; ` +
` sleep $i; ` +
`done; ` +
`exit 0`

if existingCommand != "" {
return existingCommand + "; " + drainLogic
}
return drainLogic
}

// QueueProxyPreStopScript is the script executed by queue-proxy's PreStop hook
const QueueProxyPreStopScript = "touch " + DrainStartedFile
Loading
Loading