Skip to content

Commit

Permalink
fix fabric8io#5036: addressing the handling of non-connection errors
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Apr 27, 2023
1 parent 6b5f135 commit 4bd80e1
Show file tree
Hide file tree
Showing 17 changed files with 46 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Fix #5015: executing resync as a locking operation to ensure resync event ordering
* Fix #5020: updating the resourceVersion on a delete with finalizers
* Fix #5033: port forwarding for clients other than okhttp needs to specify the subprotocol
* fix #5036: Better websocket error handling for protocol / client enforced errors, also update frame/message limits
* Fix #5044: disable Vert.x instance file caching
* Fix #5059: Vert.x InputStreamReader uses an empty Buffer sentinel to avoid NPE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusC

@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error);
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class JettyHttpClientBuilder
extends StandardHttpClientBuilder<JettyHttpClient, JettyHttpClientFactory, JettyHttpClientBuilder> {

private static final int MAX_CONNECTIONS = Integer.MAX_VALUE;
// the default for etcd seems to be 3 MB, but we'll default to unlimited to have the same behavior across clients
private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE;

public JettyHttpClientBuilder(JettyHttpClientFactory clientFactory) {
super(clientFactory);
Expand All @@ -62,6 +64,10 @@ public JettyHttpClient build() {
}
HttpClient sharedHttpClient = new HttpClient(newTransport(sslContextFactory, preferHttp11));
WebSocketClient sharedWebSocketClient = new WebSocketClient(new HttpClient(newTransport(sslContextFactory, preferHttp11)));
sharedWebSocketClient.setMaxBinaryMessageSize(MAX_WS_MESSAGE_SIZE);
// the api-server does not seem to fragment messages, so the frames can be very large
sharedWebSocketClient.setMaxFrameSize(MAX_WS_MESSAGE_SIZE);
sharedWebSocketClient.setMaxTextMessageSize(MAX_WS_MESSAGE_SIZE);
sharedWebSocketClient.setIdleTimeout(Duration.ZERO);
if (connectTimeout != null) {
sharedHttpClient.setConnectTimeout(connectTimeout.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void onWebSocketError(Throwable cause) {
// - Jetty throws a ClosedChannelException
return;
}
listener.onError(this, cause);
listener.onError(this, cause, cause instanceof IOException);
}

private void backPressure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ public void onClose(WebSocket webSocket, int code, String reason) {
}

@Override
public void onError(WebSocket webSocket, Throwable error) {
events.put("onError", new Object[] { error });
public void onError(WebSocket webSocket, Throwable error, boolean connectionError) {
events.put("onError", new Object[] { error, connectionError });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
future.completeExceptionally(t);
}
} else {
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t);
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class VertxHttpClientBuilder<F extends HttpClient.Factory>
extends StandardHttpClientBuilder<VertxHttpClient<F>, F, VertxHttpClientBuilder<F>> {

private static final int MAX_CONNECTIONS = 8192;
// the default for etcd seems to be 3 MB, but we'll default to unlimited to have the same behavior across clients
private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE;

final Vertx vertx;

Expand All @@ -51,6 +53,9 @@ public VertxHttpClient<F> build() {
options.setMaxPoolSize(MAX_CONNECTIONS);
options.setMaxWebSockets(MAX_CONNECTIONS);
options.setIdleTimeoutUnit(TimeUnit.SECONDS);
// the api-server does not seem to fragment messages, so the frames can be very large
options.setMaxWebSocketFrameSize(MAX_WS_MESSAGE_SIZE);
options.setMaxWebSocketMessageSize(MAX_WS_MESSAGE_SIZE);

if (this.connectTimeout != null) {
options.setConnectTimeout((int) this.connectTimeout.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.buffer.Unpooled;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClosedException;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -52,7 +53,16 @@ void init() {
ws.pongHandler(b -> ws.fetch(1));
// use end, not close, because close is processed immediately vs. end is in frame order
ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason()));
ws.exceptionHandler(err -> listener.onError(this, err));
ws.exceptionHandler(err -> {
try {
listener.onError(this, err, err instanceof HttpClosedException);
} finally {
// onError should be terminal
if (!ws.isClosed()) {
ws.close();
}
}
});
listener.onOpen(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface WebSocket {

/**
* Callback methods for websocket events. The methods are
* guaranteed to be called serially - except for {@link #onError(WebSocket, Throwable)}
* guaranteed to be called serially - except for {@link Listener#onError(WebSocket, Throwable, boolean)}
*/
interface Listener {

Expand Down Expand Up @@ -59,7 +59,7 @@ default void onClose(WebSocket webSocket, int code, String reason) {
* Called when an error has occurred. It's a terminal event, calls to {@link WebSocket#request()}
* do nothing after this.
*/
default void onError(WebSocket webSocket, Throwable error) {
default void onError(WebSocket webSocket, Throwable error, boolean connectionError) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void onOpen(WebSocket webSocket) {
}

@Override
public void onError(WebSocket webSocket, Throwable t) {
public void onError(WebSocket webSocket, Throwable t, boolean connectionError) {
closed.set(true);
HttpResponse<?> response = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public PortForward forward(URL resourceBaseUrl, int port, final ReadableByteChan

socket.whenComplete((w, t) -> {
if (t != null) {
listener.onError(w, t);
listener.onError(w, t, true);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void onClose(WebSocket webSocket, int code, String reason) {
}

@Override
public void onError(WebSocket webSocket, Throwable t) {
public void onError(WebSocket webSocket, Throwable t, boolean connectionError) {
logger.debug("{}: Throwable received from websocket", LOG_PREFIX, t);
if (alive.get()) {
serverThrowables.add(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected void start(URL url, Map<String, String> headers, WatchRequestState sta
if (ready) {
// if we're not ready yet, that means we're waiting on the future and there's
// no need to invoke the reconnect logic
listener.onError(w, t);
listener.onError(w, t, true);
}
throw KubernetesClientException.launderThrowable(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.WatchRequestState;
import io.fabric8.kubernetes.client.http.WebSocket;
import org.slf4j.Logger;
Expand All @@ -42,9 +43,13 @@ public void onOpen(final WebSocket webSocket) {
}

@Override
public void onError(WebSocket webSocket, Throwable t) {
logger.debug("WebSocket error received", t);
manager.scheduleReconnect(state);
public void onError(WebSocket webSocket, Throwable t, boolean connectionError) {
if (connectionError) {
logger.debug("WebSocket error received", t);
manager.scheduleReconnect(state);
} else {
manager.close(new WatcherException("Could not process websocket message", t));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private ExecWebSocketListener setupConnectionToPod(URI uri) {
.buildAsync(execWebSocketListener);
startedFuture.whenComplete((w, t) -> {
if (t != null) {
execWebSocketListener.onError(w, t);
execWebSocketListener.onError(w, t, true);
}
});
Utils.waitUntilReadyOrFail(startedFuture, getRequestConfig().getWebsocketTimeout(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void testCheckErrorHasErrorFromMessageShouldThrowException() {
void testCheckErrorHasErrorFromFailureShouldThrowException() {
ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext());

listener.onError(null, new IOException("here"));
listener.onError(null, new IOException("here"), true);

assertThrows(KubernetesClientException.class, () -> listener.checkError());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void onOpen_withException_shouldCloseWebSocketAndStoreException() throws IOExcep
@Test
void onError_shouldStoreExceptionAndCloseChannels() {
listener = new PortForwarderWebsocketListener(in, out, CommonThreadPool.get());
listener.onError(webSocket, new RuntimeException("Server error"));
listener.onError(webSocket, new RuntimeException("Server error"), true);
// Then
assertThat(listener.getServerThrowables())
.singleElement()
Expand Down

0 comments on commit 4bd80e1

Please sign in to comment.