diff --git a/go.mod b/go.mod index ecf9ed6dea..08127a585b 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.8.4 - github.com/tektoncd/pipeline v0.50.2 + github.com/tektoncd/pipeline v0.50.3 github.com/tektoncd/plumbing v0.0.0-20220817140952-3da8ce01aeeb go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.25.0 @@ -165,7 +165,7 @@ require ( google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect - google.golang.org/grpc v1.57.0 // indirect + google.golang.org/grpc v1.57.2 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index b77f594dcc..e129e308f5 100644 --- a/go.sum +++ b/go.sum @@ -1134,8 +1134,8 @@ github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= -github.com/tektoncd/pipeline v0.50.2 h1:4uxwjXK71ai9U/ff2DZQ41rR8vObssjAxN8C1exI7cI= -github.com/tektoncd/pipeline v0.50.2/go.mod h1:MRjZCAtVxzDYKvKjPkbtv6kw8TCujnSlFhl0u8JXOmE= +github.com/tektoncd/pipeline v0.50.3 h1:4QQza8hewF+4wdBLbRw26CDQx0/y58WYfZwQ/xAsvOQ= +github.com/tektoncd/pipeline v0.50.3/go.mod h1:33ZU30CR8Pbr6Pb4l7+Tz1oPGsJBY5yxyG8Z+ejGO0w= github.com/tektoncd/plumbing v0.0.0-20220817140952-3da8ce01aeeb h1:LUUCR8pLF+MzdQ7kOQQrMzDahIPZLdPCzfnNow1Um3Y= github.com/tektoncd/plumbing v0.0.0-20220817140952-3da8ce01aeeb/go.mod h1:uJBaI0AL/kjPThiMYZcWRujEz7D401v643d6s/21GAg= github.com/theupdateframework/go-tuf v0.5.2 h1:habfDzTmpbzBLIFGWa2ZpVhYvFBoK0C1onC3a4zuPRA= @@ -1770,8 +1770,8 @@ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.57.2 h1:uw37EN34aMFFXB2QPW7Tq6tdTbind1GpRxw5aOX3a5k= +google.golang.org/grpc v1.57.2/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index f960640128..0bc7a7d576 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -171,15 +171,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, ID: http2.SettingMaxFrameSize, Val: http2MaxFrameLen, }} - // TODO(zhaoq): Have a better way to signal "no limit" because 0 is - // permitted in the HTTP2 spec. - maxStreams := config.MaxStreams - if maxStreams == 0 { - maxStreams = math.MaxUint32 - } else { + if config.MaxStreams != math.MaxUint32 { isettings = append(isettings, http2.Setting{ ID: http2.SettingMaxConcurrentStreams, - Val: maxStreams, + Val: config.MaxStreams, }) } dynamicWindow := true @@ -258,7 +253,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, framer: framer, readerDone: make(chan struct{}), writerDone: make(chan struct{}), - maxStreams: maxStreams, + maxStreams: config.MaxStreams, inTapHandle: config.InTapHandle, fc: &trInFlow{limit: uint32(icwz)}, state: reachable, diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index e076ec7143..e6f8925966 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -115,12 +115,6 @@ type serviceInfo struct { mdata interface{} } -type serverWorkerData struct { - st transport.ServerTransport - wg *sync.WaitGroup - stream *transport.Stream -} - // Server is a gRPC server to serve RPC requests. type Server struct { opts serverOptions @@ -145,7 +139,7 @@ type Server struct { channelzID *channelz.Identifier czData *channelzData - serverWorkerChannel chan *serverWorkerData + serverWorkerChannel chan func() } type serverOptions struct { @@ -178,6 +172,7 @@ type serverOptions struct { } var defaultServerOptions = serverOptions{ + maxConcurrentStreams: math.MaxUint32, maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize, connectionTimeout: 120 * time.Second, @@ -389,6 +384,9 @@ func MaxSendMsgSize(m int) ServerOption { // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number // of concurrent streams to each ServerTransport. func MaxConcurrentStreams(n uint32) ServerOption { + if n == 0 { + n = math.MaxUint32 + } return newFuncServerOption(func(o *serverOptions) { o.maxConcurrentStreams = n }) @@ -590,24 +588,19 @@ const serverWorkerResetThreshold = 1 << 16 // [1] https://github.com/golang/go/issues/18138 func (s *Server) serverWorker() { for completed := 0; completed < serverWorkerResetThreshold; completed++ { - data, ok := <-s.serverWorkerChannel + f, ok := <-s.serverWorkerChannel if !ok { return } - s.handleSingleStream(data) + f() } go s.serverWorker() } -func (s *Server) handleSingleStream(data *serverWorkerData) { - defer data.wg.Done() - s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) -} - // initServerWorkers creates worker goroutines and a channel to process incoming // connections to reduce the time spent overall on runtime.morestack. func (s *Server) initServerWorkers() { - s.serverWorkerChannel = make(chan *serverWorkerData) + s.serverWorkerChannel = make(chan func()) for i := uint32(0); i < s.opts.numServerWorkers; i++ { go s.serverWorker() } @@ -966,21 +959,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close(errors.New("finished serving streams for the server transport")) var wg sync.WaitGroup + streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) + + streamQuota.acquire() + f := func() { + defer streamQuota.release() + defer wg.Done() + s.handleStream(st, stream, s.traceInfo(st, stream)) + } + if s.opts.numServerWorkers > 0 { - data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { - case s.serverWorkerChannel <- data: + case s.serverWorkerChannel <- f: return default: // If all stream workers are busy, fallback to the default code path. } } - go func() { - defer wg.Done() - s.handleStream(st, stream, s.traceInfo(st, stream)) - }() + go f() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx @@ -2075,3 +2073,32 @@ func validateSendCompressor(name, clientCompressors string) error { } return fmt.Errorf("client does not support compressor %q", name) } + +// atomicSemaphore implements a blocking, counting semaphore. acquire should be +// called synchronously; release may be called asynchronously. +type atomicSemaphore struct { + n int64 // accessed atomically + wait chan struct{} +} + +func (q *atomicSemaphore) acquire() { + if atomic.AddInt64(&q.n, -1) < 0 { + // We ran out of quota. Block until a release happens. + <-q.wait + } +} + +func (q *atomicSemaphore) release() { + // N.B. the "<= 0" check below should allow for this to work with multiple + // concurrent calls to acquire, but also note that with synchronous calls to + // acquire, as our system does, n will never be less than -1. There are + // fairness issues (queuing) to consider if this was to be generalized. + if atomic.AddInt64(&q.n, 1) <= 0 { + // An acquire was waiting on us. Unblock it. + q.wait <- struct{}{} + } +} + +func newHandlerQuota(n uint32) *atomicSemaphore { + return &atomicSemaphore{n: int64(n), wait: make(chan struct{}, 1)} +} diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 353cfd5286..f80ff441f1 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.57.0" +const Version = "1.57.2" diff --git a/vendor/modules.txt b/vendor/modules.txt index 45dda95f01..c27845bbac 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -510,7 +510,7 @@ github.com/stretchr/testify/require # github.com/subosito/gotenv v1.4.2 ## explicit; go 1.18 github.com/subosito/gotenv -# github.com/tektoncd/pipeline v0.50.2 +# github.com/tektoncd/pipeline v0.50.3 ## explicit; go 1.19 github.com/tektoncd/pipeline/pkg/apis/config github.com/tektoncd/pipeline/pkg/apis/config/resolver @@ -805,7 +805,7 @@ google.golang.org/genproto/googleapis/api/httpbody # google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 ## explicit; go 1.19 google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.57.0 +# google.golang.org/grpc v1.57.2 ## explicit; go 1.17 google.golang.org/grpc google.golang.org/grpc/attributes