-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Ensure websocket conections persist until done on queue-proxy drain #15759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| "net/http" | ||
| "os" | ||
| "strconv" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/kelseyhightower/envconfig" | ||
|
|
@@ -169,6 +170,8 @@ | |
| d := Defaults{ | ||
| Ctx: signals.NewContext(), | ||
| } | ||
| pendingRequests := atomic.Int32{} | ||
| pendingRequests.Store(0) | ||
|
Comment on lines
+173
to
+174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above comment - let's merge this into a special http.Handler struct counting requests - that is created in the |
||
|
|
||
| // Parse the environment. | ||
| var env config | ||
|
|
@@ -234,7 +237,7 @@ | |
| // Enable TLS when certificate is mounted. | ||
| tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) | ||
|
|
||
| mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger) | ||
| mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger, &pendingRequests) | ||
| adminHandler := adminHandler(d.Ctx, logger, drainer) | ||
|
|
||
| // Enable TLS server when activator server certs are mounted. | ||
|
|
@@ -303,9 +306,23 @@ | |
| return err | ||
| case <-d.Ctx.Done(): | ||
| logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") | ||
| logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) | ||
| drainer.Drain() | ||
|
|
||
| // Wait on active requests to complete. This is done explictly | ||
|
Check failure on line 311 in pkg/queue/sharedmain/main.go
|
||
| // to avoid closing any connections which have been highjacked, | ||
| // as in net/http `.Shutdown` would do so ungracefully. | ||
| // See https://github.com/golang/go/issues/17721 | ||
| ticker := time.NewTicker(1 * time.Second) | ||
| defer ticker.Stop() | ||
| logger.Infof("Drain: waiting for %d pending requests to complete", pendingRequests.Load()) | ||
| WaitOnPendingRequests: | ||
| for range ticker.C { | ||
| if pendingRequests.Load() <= 0 { | ||
| logger.Infof("Drain: all pending requests completed") | ||
| break WaitOnPendingRequests | ||
| } | ||
| } | ||
|
|
||
| for name, srv := range httpServers { | ||
| logger.Info("Shutting down server: ", name) | ||
| if err := srv.Shutdown(context.Background()); err != nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -322,6 +322,11 @@ | |
| idleTimeoutSeconds: 10, | ||
| delay: "20", | ||
| expectError: true, | ||
| }, { | ||
| name: "websocket does not drop after queue drain is called at 30s", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should make a separate this - cause this isn't really testing the |
||
| timeoutSeconds: 60, | ||
| delay: "45", | ||
| expectError: false, | ||
| }} | ||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
|
|
@@ -349,9 +354,47 @@ | |
| } | ||
| } | ||
|
|
||
| func TestWebSocketDrain(t *testing.T) { | ||
| clients := Setup(t) | ||
|
|
||
| testCases := []struct { | ||
| name string | ||
| timeoutSeconds int64 | ||
| delay string | ||
| expectError bool | ||
| }{{ | ||
| name: "websocket does not drop after queue drain is called", | ||
| timeoutSeconds: 60, | ||
| delay: "45", | ||
| expectError: false, | ||
| }} | ||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| names := test.ResourceNames{ | ||
| Service: test.ObjectNameForTest(t), | ||
| Image: wsServerTestImageName, | ||
| } | ||
|
|
||
| // Clean up in both abnormal and normal exits. | ||
| test.EnsureTearDown(t, clients, &names) | ||
|
|
||
| _, err := v1test.CreateServiceReady(t, clients, &names, | ||
| rtesting.WithRevisionTimeoutSeconds(tc.timeoutSeconds), | ||
| if err != nil { | ||
|
Check failure on line 383 in test/e2e/websocket_test.go
|
||
| t.Fatal("Failed to create WebSocket server:", err) | ||
|
Check failure on line 384 in test/e2e/websocket_test.go
|
||
| } | ||
|
Check failure on line 385 in test/e2e/websocket_test.go
|
||
| // Validate the websocket connection. | ||
| err = ValidateWebSocketConnection(t, clients, names, tc.delay) | ||
|
Check failure on line 387 in test/e2e/websocket_test.go
|
||
| if (err == nil && tc.expectError) || (err != nil && !tc.expectError) { | ||
|
Check failure on line 388 in test/e2e/websocket_test.go
|
||
| t.Error(err) | ||
|
Check failure on line 389 in test/e2e/websocket_test.go
|
||
| } | ||
|
Check failure on line 390 in test/e2e/websocket_test.go
|
||
| }) | ||
| } | ||
| } | ||
|
|
||
| func abs(a int) int { | ||
| if a < 0 { | ||
|
Check failure on line 396 in test/e2e/websocket_test.go
|
||
| return -a | ||
|
Check failure on line 397 in test/e2e/websocket_test.go
|
||
| } | ||
|
Check failure on line 398 in test/e2e/websocket_test.go
|
||
| return a | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create an struct that implements http.Handler and it holds the atomic counter - that would make this reusable
Your handler can hold the next handler so you can call
h.ServeHTTP(w, r)