From 4bd80e1497720e2fee9a134e2a102ef4477b4023 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 19 Apr 2023 09:57:20 -0400 Subject: [PATCH] fix #5036: addressing the handling of non-connection errors --- CHANGELOG.md | 1 + .../kubernetes/client/jdkhttp/JdkWebSocketImpl.java | 2 +- .../client/jetty/JettyHttpClientBuilder.java | 6 ++++++ .../kubernetes/client/jetty/JettyWebSocket.java | 3 ++- .../kubernetes/client/jetty/JettyWebSocketTest.java | 4 ++-- .../client/okhttp/OkHttpWebSocketImpl.java | 2 +- .../client/vertx/VertxHttpClientBuilder.java | 5 +++++ .../kubernetes/client/vertx/VertxWebSocket.java | 12 +++++++++++- .../io/fabric8/kubernetes/client/http/WebSocket.java | 4 ++-- .../client/dsl/internal/ExecWebSocketListener.java | 2 +- .../client/dsl/internal/PortForwarderWebsocket.java | 2 +- .../dsl/internal/PortForwarderWebsocketListener.java | 2 +- .../client/dsl/internal/WatchConnectionManager.java | 2 +- .../dsl/internal/WatcherWebSocketListener.java | 11 ++++++++--- .../dsl/internal/core/v1/PodOperationsImpl.java | 2 +- .../dsl/internal/ExecWebSocketListenerTest.java | 2 +- .../internal/PortForwarderWebsocketListenerTest.java | 2 +- 17 files changed, 46 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c99fe5a3757..b5a47495e22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * Fix #5015: executing resync as a locking operation to ensure resync event ordering * Fix #5020: updating the resourceVersion on a delete with finalizers * Fix #5033: port forwarding for clients other than okhttp needs to specify the subprotocol +* fix #5036: Better websocket error handling for protocol / client enforced errors, also update frame/message limits * Fix #5044: disable Vert.x instance file caching * Fix #5059: Vert.x InputStreamReader uses an empty Buffer sentinel to avoid NPE diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java index b32259a25cf..663ef8e54a1 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java @@ -82,7 +82,7 @@ public CompletionStage onClose(java.net.http.WebSocket webSocket, int statusC @Override public void onError(java.net.http.WebSocket webSocket, Throwable error) { - listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error); + listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error, false); } @Override diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java index 8daa983bcf9..32a48b605e8 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java @@ -39,6 +39,8 @@ public class JettyHttpClientBuilder extends StandardHttpClientBuilder { private static final int MAX_CONNECTIONS = Integer.MAX_VALUE; + // the default for etcd seems to be 3 MB, but we'll default to unlimited to have the same behavior across clients + private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE; public JettyHttpClientBuilder(JettyHttpClientFactory clientFactory) { super(clientFactory); @@ -62,6 +64,10 @@ public JettyHttpClient build() { } HttpClient sharedHttpClient = new HttpClient(newTransport(sslContextFactory, preferHttp11)); WebSocketClient sharedWebSocketClient = new WebSocketClient(new HttpClient(newTransport(sslContextFactory, preferHttp11))); + sharedWebSocketClient.setMaxBinaryMessageSize(MAX_WS_MESSAGE_SIZE); + // the api-server does not seem to fragment messages, so the frames can be very large + sharedWebSocketClient.setMaxFrameSize(MAX_WS_MESSAGE_SIZE); + sharedWebSocketClient.setMaxTextMessageSize(MAX_WS_MESSAGE_SIZE); sharedWebSocketClient.setIdleTimeout(Duration.ZERO); if (connectTimeout != null) { sharedHttpClient.setConnectTimeout(connectTimeout.toMillis()); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index dafd7d7de02..7aa98815559 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.exceptions.UpgradeException; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeUnit; @@ -140,7 +141,7 @@ public void onWebSocketError(Throwable cause) { // - Jetty throws a ClosedChannelException return; } - listener.onError(this, cause); + listener.onError(this, cause, cause instanceof IOException); } private void backPressure() { diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java index 83a3510e869..9b39e9af1e8 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -258,8 +258,8 @@ public void onClose(WebSocket webSocket, int code, String reason) { } @Override - public void onError(WebSocket webSocket, Throwable error) { - events.put("onError", new Object[] { error }); + public void onError(WebSocket webSocket, Throwable error, boolean connectionError) { + events.put("onError", new Object[] { error, connectionError }); } } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java index 0bfab821aa1..3608de34d10 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java @@ -92,7 +92,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons future.completeExceptionally(t); } } else { - listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t); + listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t, true); } } diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java index eb335fe4a96..628effc16d7 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java @@ -36,6 +36,8 @@ public class VertxHttpClientBuilder extends StandardHttpClientBuilder, F, VertxHttpClientBuilder> { private static final int MAX_CONNECTIONS = 8192; + // the default for etcd seems to be 3 MB, but we'll default to unlimited to have the same behavior across clients + private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE; final Vertx vertx; @@ -51,6 +53,9 @@ public VertxHttpClient build() { options.setMaxPoolSize(MAX_CONNECTIONS); options.setMaxWebSockets(MAX_CONNECTIONS); options.setIdleTimeoutUnit(TimeUnit.SECONDS); + // the api-server does not seem to fragment messages, so the frames can be very large + options.setMaxWebSocketFrameSize(MAX_WS_MESSAGE_SIZE); + options.setMaxWebSocketMessageSize(MAX_WS_MESSAGE_SIZE); if (this.connectTimeout != null) { options.setConnectTimeout((int) this.connectTimeout.toMillis()); diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java index 3bbb94dd370..c7c8300f564 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java @@ -20,6 +20,7 @@ import io.netty.buffer.Unpooled; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClosedException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; @@ -52,7 +53,16 @@ void init() { ws.pongHandler(b -> ws.fetch(1)); // use end, not close, because close is processed immediately vs. end is in frame order ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason())); - ws.exceptionHandler(err -> listener.onError(this, err)); + ws.exceptionHandler(err -> { + try { + listener.onError(this, err, err instanceof HttpClosedException); + } finally { + // onError should be terminal + if (!ws.isClosed()) { + ws.close(); + } + } + }); listener.onOpen(this); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java index bed6a592983..f57fc82ecdf 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java @@ -24,7 +24,7 @@ public interface WebSocket { /** * Callback methods for websocket events. The methods are - * guaranteed to be called serially - except for {@link #onError(WebSocket, Throwable)} + * guaranteed to be called serially - except for {@link Listener#onError(WebSocket, Throwable, boolean)} */ interface Listener { @@ -59,7 +59,7 @@ default void onClose(WebSocket webSocket, int code, String reason) { * Called when an error has occurred. It's a terminal event, calls to {@link WebSocket#request()} * do nothing after this. */ - default void onError(WebSocket webSocket, Throwable error) { + default void onError(WebSocket webSocket, Throwable error, boolean connectionError) { } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 5a0a5dd2ec9..6ddcad7934f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -261,7 +261,7 @@ public void onOpen(WebSocket webSocket) { } @Override - public void onError(WebSocket webSocket, Throwable t) { + public void onError(WebSocket webSocket, Throwable t, boolean connectionError) { closed.set(true); HttpResponse response = null; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java index 337a896eb1c..1e9d8a4eb06 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java @@ -178,7 +178,7 @@ public PortForward forward(URL resourceBaseUrl, int port, final ReadableByteChan socket.whenComplete((w, t) -> { if (t != null) { - listener.onError(w, t); + listener.onError(w, t, true); } }); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java index 60ebcf798ab..70c3354bdb8 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java @@ -159,7 +159,7 @@ public void onClose(WebSocket webSocket, int code, String reason) { } @Override - public void onError(WebSocket webSocket, Throwable t) { + public void onError(WebSocket webSocket, Throwable t, boolean connectionError) { logger.debug("{}: Throwable received from websocket", LOG_PREFIX, t); if (alive.get()) { serverThrowables.add(t); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 327deaf67e8..af19ebae7a4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -119,7 +119,7 @@ protected void start(URL url, Map headers, WatchRequestState sta if (ready) { // if we're not ready yet, that means we're waiting on the future and there's // no need to invoke the reconnect logic - listener.onError(w, t); + listener.onError(w, t, true); } throw KubernetesClientException.launderThrowable(t); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 170bbc406da..c4bf5712400 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -16,6 +16,7 @@ package io.fabric8.kubernetes.client.dsl.internal; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.WatchRequestState; import io.fabric8.kubernetes.client.http.WebSocket; import org.slf4j.Logger; @@ -42,9 +43,13 @@ public void onOpen(final WebSocket webSocket) { } @Override - public void onError(WebSocket webSocket, Throwable t) { - logger.debug("WebSocket error received", t); - manager.scheduleReconnect(state); + public void onError(WebSocket webSocket, Throwable t, boolean connectionError) { + if (connectionError) { + logger.debug("WebSocket error received", t); + manager.scheduleReconnect(state); + } else { + manager.close(new WatcherException("Could not process websocket message", t)); + } } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index a0866c186bf..3f7606ce14d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -383,7 +383,7 @@ private ExecWebSocketListener setupConnectionToPod(URI uri) { .buildAsync(execWebSocketListener); startedFuture.whenComplete((w, t) -> { if (t != null) { - execWebSocketListener.onError(w, t); + execWebSocketListener.onError(w, t, true); } }); Utils.waitUntilReadyOrFail(startedFuture, getRequestConfig().getWebsocketTimeout(), TimeUnit.MILLISECONDS); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java index 5e1e2f10fe0..ce5dc1e26c9 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java @@ -95,7 +95,7 @@ void testCheckErrorHasErrorFromMessageShouldThrowException() { void testCheckErrorHasErrorFromFailureShouldThrowException() { ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext()); - listener.onError(null, new IOException("here")); + listener.onError(null, new IOException("here"), true); assertThrows(KubernetesClientException.class, () -> listener.checkError()); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java index 2b9d835bee0..64390a7fe77 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java @@ -108,7 +108,7 @@ void onOpen_withException_shouldCloseWebSocketAndStoreException() throws IOExcep @Test void onError_shouldStoreExceptionAndCloseChannels() { listener = new PortForwarderWebsocketListener(in, out, CommonThreadPool.get()); - listener.onError(webSocket, new RuntimeException("Server error")); + listener.onError(webSocket, new RuntimeException("Server error"), true); // Then assertThat(listener.getServerThrowables()) .singleElement()