Skip to content

Commit 8850ca4

Browse files
authored
Merge pull request #2429 from dani8art/cherry-pick-16-some-fixes
Cherry pick 16: some bug fixes
2 parents 915e866 + 4b6a89a commit 8850ca4

File tree

5 files changed

+152
-25
lines changed

5 files changed

+152
-25
lines changed

extended/src/main/java/io/kubernetes/client/extended/kubectl/KubectlExec.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import java.io.IOException;
2222
import java.io.InputStream;
2323
import java.io.OutputStream;
24+
import java.util.Optional;
25+
import java.util.function.Consumer;
2426

2527
public class KubectlExec extends Kubectl.ResourceAndContainerBuilder<V1Pod, KubectlExec>
2628
implements Kubectl.Executable<Integer> {
2729
private String[] command;
2830
private boolean stdin;
2931
private boolean tty;
32+
private Consumer<Throwable> onUnhandledError = Throwable::printStackTrace;
3033

3134
KubectlExec() {
3235
super(V1Pod.class);
@@ -47,33 +50,43 @@ public KubectlExec tty(boolean tty) {
4750
return this;
4851
}
4952

53+
public KubectlExec onUnhandledError(Consumer<Throwable> onUnhandledError) {
54+
this.onUnhandledError = onUnhandledError;
55+
return this;
56+
}
57+
5058
@Override
5159
public Integer execute() throws KubectlException {
5260
V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name(name).namespace(namespace));
5361

5462
Exec exec = new Exec(apiClient);
63+
exec.setOnUnhandledError(onUnhandledError);
64+
5565
try {
5666
Process proc = exec.exec(pod, command, container, stdin, tty);
57-
copyAsync(proc.getInputStream(), System.out);
58-
copyAsync(proc.getErrorStream(), System.err);
67+
copyAsync(proc.getInputStream(), System.out, onUnhandledError);
68+
copyAsync(proc.getErrorStream(), System.err, onUnhandledError);
5969
if (stdin) {
60-
copyAsync(System.in, proc.getOutputStream());
70+
copyAsync(System.in, proc.getOutputStream(), onUnhandledError);
6171
}
6272
return proc.waitFor();
6373
} catch (InterruptedException | ApiException | IOException ex) {
6474
throw new KubectlException(ex);
6575
}
6676
}
6777

68-
protected static Thread copyAsync(InputStream in, OutputStream out) {
78+
protected static Thread copyAsync(
79+
InputStream in, OutputStream out, Consumer<Throwable> onUnhandledError) {
6980
Thread t =
7081
new Thread(
7182
new Runnable() {
7283
public void run() {
7384
try {
7485
Streams.copy(in, out);
7586
} catch (IOException ex) {
76-
ex.printStackTrace();
87+
Optional.ofNullable(onUnhandledError)
88+
.orElse(Throwable::printStackTrace)
89+
.accept(ex);
7790
}
7891
}
7992
});

extended/src/main/java/io/kubernetes/client/extended/kubectl/KubectlPortForward.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525
import java.net.Socket;
2626
import java.util.ArrayList;
2727
import java.util.List;
28+
import java.util.Optional;
29+
import java.util.function.Consumer;
2830

2931
public class KubectlPortForward
3032
extends Kubectl.ResourceAndContainerBuilder<V1Pod, KubectlPortForward>
3133
implements Kubectl.Executable<Boolean> {
3234
List<Integer> localPorts;
3335
List<Integer> targetPorts;
3436
boolean running;
37+
Consumer<Throwable> onUnhandledError = Throwable::printStackTrace;
3538

3639
KubectlPortForward() {
3740
super(V1Pod.class);
@@ -52,6 +55,11 @@ public KubectlPortForward ports(int localPort, int targetPort) {
5255
return this;
5356
}
5457

58+
public KubectlPortForward onUnhandledError(Consumer<Throwable> onUnhandledError) {
59+
this.onUnhandledError = onUnhandledError;
60+
return this;
61+
}
62+
5563
@Override
5664
public Boolean execute() throws KubectlException {
5765
running = true;
@@ -98,13 +106,15 @@ public void run() {
98106
while (running) {
99107
try {
100108
Socket sock = server.accept();
101-
Thread t1 = copyAsync(sock.getInputStream(), out);
102-
Thread t2 = copyAsync(in, sock.getOutputStream());
109+
Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError);
110+
Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError);
103111

104112
t1.join();
105113
t2.join();
106114
} catch (InterruptedException | IOException ex) {
107-
ex.printStackTrace();
115+
Optional.ofNullable(onUnhandledError)
116+
.orElse(Throwable::printStackTrace)
117+
.accept(ex);
108118
}
109119
}
110120
}

util/src/main/java/io/kubernetes/client/Exec.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.Optional;
4243
import java.util.concurrent.CompletableFuture;
4344
import java.util.concurrent.CountDownLatch;
4445
import java.util.concurrent.Future;
@@ -55,6 +56,8 @@ public class Exec {
5556

5657
private ApiClient apiClient;
5758

59+
private Consumer<Throwable> onUnhandledError;
60+
5861
/** Simple Exec API constructor, uses default configuration */
5962
public Exec() {
6063
this(Configuration.getDefaultApiClient());
@@ -87,6 +90,26 @@ public void setApiClient(ApiClient apiClient) {
8790
this.apiClient = apiClient;
8891
}
8992

93+
/**
94+
* Get a {@link Consumer<Throwable>} that will be accepted if there is any unhandled exception
95+
* while the websocket communication is happening.
96+
*
97+
* @return The {@link Consumer<Throwable>} that will be used.
98+
*/
99+
public Consumer<Throwable> getOnUnhandledError() {
100+
return onUnhandledError;
101+
}
102+
103+
/**
104+
* Set the {@link Consumer<Throwable>} that will be accepted if there is any unhandled exception
105+
* while the websocket communication is happening.
106+
*
107+
* @param onUnhandledError The new {@link Consumer<Throwable>} to use.
108+
*/
109+
public void setOnUnhandledError(Consumer<Throwable> onUnhandledError) {
110+
this.onUnhandledError = onUnhandledError;
111+
}
112+
90113
/**
91114
* Setup a Builder for the given namespace, name and command
92115
*
@@ -195,6 +218,7 @@ public Process exec(
195218
.setContainer(container)
196219
.setStdin(stdin)
197220
.setTty(tty)
221+
.setOnUnhandledError(onUnhandledError)
198222
.execute();
199223
}
200224

@@ -264,10 +288,10 @@ public Future<Integer> exec(
264288
Supplier<Integer> returnCode = process::exitValue;
265289
try {
266290
log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr);
267-
boolean beforeTimout =
291+
boolean beforeTimeout =
268292
waitForProcessToExit(
269293
process, timeoutMs, cmdStr, err -> errHandler.accept(err, io));
270-
if (!beforeTimout) {
294+
if (!beforeTimeout) {
271295
returnCode = () -> Integer.MAX_VALUE;
272296
}
273297
} catch (Exception e) {
@@ -332,6 +356,7 @@ public final class ExecutionBuilder {
332356
private boolean stdout;
333357
private boolean stderr;
334358
private boolean tty;
359+
private Consumer<Throwable> onUnhandledError;
335360

336361
private ExecutionBuilder(String namespace, String name, String[] command) {
337362
this.namespace = namespace;
@@ -399,6 +424,15 @@ public ExecutionBuilder setTty(boolean tty) {
399424
return this;
400425
}
401426

427+
public Consumer<Throwable> getOnUnhandledError() {
428+
return onUnhandledError;
429+
}
430+
431+
public ExecutionBuilder setOnUnhandledError(Consumer<Throwable> onUnhandledError) {
432+
this.onUnhandledError = onUnhandledError;
433+
return this;
434+
}
435+
402436
private String makePath() {
403437
String[] encodedCommand = new String[command.length];
404438
for (int i = 0; i < command.length; i++) {
@@ -433,7 +467,7 @@ public Process execute() throws ApiException, IOException {
433467
container = pod.getSpec().getContainers().get(0).getName();
434468
}
435469

436-
ExecProcess exec = new ExecProcess(apiClient);
470+
ExecProcess exec = new ExecProcess(apiClient, onUnhandledError);
437471
WebSocketStreamHandler handler = exec.getHandler();
438472
WebSockets.stream(makePath(), "GET", apiClient, handler);
439473

@@ -482,12 +516,20 @@ static int parseExitCode(ApiClient client, InputStream inputStream) {
482516

483517
public static class ExecProcess extends Process {
484518
private final WebSocketStreamHandler streamHandler;
519+
private final Consumer<Throwable> onUnhandledError;
485520
private int statusCode = -1;
486521
private boolean isAlive = true;
487522
private final Map<Integer, InputStream> input = new HashMap<>();
488523
private final CountDownLatch latch = new CountDownLatch(1);
489524

490525
public ExecProcess(final ApiClient apiClient) throws IOException {
526+
this(apiClient, Throwable::printStackTrace);
527+
}
528+
529+
public ExecProcess(final ApiClient apiClient, final Consumer<Throwable> onUnhandledError)
530+
throws IOException {
531+
this.onUnhandledError =
532+
Optional.ofNullable(onUnhandledError).orElse(Throwable::printStackTrace);
491533
this.streamHandler =
492534
new WebSocketStreamHandler() {
493535
@Override
@@ -513,12 +555,8 @@ protected void handleMessage(int stream, InputStream inStream) throws IOExceptio
513555
@Override
514556
public void failure(Throwable ex) {
515557
super.failure(ex);
516-
// TODO, it's possible we should suppress this error message, but
517-
// currently there's
518-
// no good place to surface the message, and without it, this will be
519-
// really hard to
520-
// debug.
521-
ex.printStackTrace();
558+
ExecProcess.this.onUnhandledError.accept(ex);
559+
522560
synchronized (ExecProcess.this) {
523561
// Try for a pretty unique error code, so if someone searches
524562
// they'll find this

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.io.PipedOutputStream;
2323
import java.io.Reader;
2424
import java.nio.charset.StandardCharsets;
25+
import java.time.Instant;
26+
import java.time.temporal.ChronoUnit;
2527
import java.util.HashMap;
2628
import java.util.Map;
2729
import okhttp3.WebSocket;
@@ -111,12 +113,6 @@ public Throwable getError() {
111113
@Override
112114
public synchronized void close() {
113115
if (state != State.CLOSED) {
114-
state = State.CLOSED;
115-
if (null != socket) {
116-
// code 1000 means "Normal Closure"
117-
socket.close(1000, "Triggered client-side terminate");
118-
log.debug("Successfully closed socket.");
119-
}
120116
// Close all output streams. Caller of getInputStream(int) is responsible
121117
// for closing returned input streams
122118
for (PipedOutputStream out : pipedOutput.values()) {
@@ -143,6 +139,13 @@ public synchronized void close() {
143139
log.error("Error on close", ex);
144140
}
145141
}
142+
143+
state = State.CLOSED;
144+
if (null != socket) {
145+
// code 1000 means "Normal Closure"
146+
socket.close(1000, "Triggered client-side terminate");
147+
log.debug("Successfully closed socket.");
148+
}
146149
}
147150
notifyAll();
148151
}
@@ -209,6 +212,13 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) {
209212
}
210213

211214
private class WebSocketOutputStream extends OutputStream {
215+
216+
private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024;
217+
218+
private static final int MAX_WAIT_MILLIS = 10000;
219+
220+
private static final int WAIT_MILLIS = 10;
221+
212222
private final byte stream;
213223

214224
public WebSocketOutputStream(int stream) {
@@ -265,10 +275,28 @@ public void write(byte[] b, int offset, int length) throws IOException {
265275
int bufferSize = Math.min(remaining, 15 * 1024 * 1024);
266276
byte[] buffer = new byte[bufferSize + 1];
267277
buffer[0] = stream;
278+
268279
System.arraycopy(b, offset + bytesWritten, buffer, 1, bufferSize);
269-
if (!WebSocketStreamHandler.this.socket.send(ByteString.of(buffer))) {
270-
throw new IOException("WebSocket has closed.");
280+
ByteString byteString = ByteString.of(buffer);
281+
282+
final Instant start = Instant.now();
283+
synchronized (WebSocketOutputStream.this) {
284+
while (WebSocketStreamHandler.this.socket.queueSize() + byteString.size() > MAX_QUEUE_SIZE
285+
&& Instant.now().isBefore(start.plus(MAX_WAIT_MILLIS, ChronoUnit.MILLIS))) {
286+
try {
287+
wait(WAIT_MILLIS);
288+
} catch (InterruptedException e) {
289+
throw new IOException("Error waiting web socket queue", e);
290+
}
291+
}
292+
293+
if (!WebSocketStreamHandler.this.socket.send(byteString)) {
294+
throw new IOException("WebSocket has closed.");
295+
}
296+
297+
notifyAll();
271298
}
299+
272300
bytesWritten += bufferSize;
273301
remaining -= bufferSize;
274302
}

util/src/test/java/io/kubernetes/client/ExecTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
2020
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
2226

2327
import com.github.tomakehurst.wiremock.core.Admin;
2428
import com.github.tomakehurst.wiremock.extension.Parameters;
@@ -40,6 +44,7 @@
4044
import java.nio.charset.StandardCharsets;
4145
import java.util.concurrent.CountDownLatch;
4246
import java.util.concurrent.Semaphore;
47+
import java.util.function.Consumer;
4348
import org.junit.Before;
4449
import org.junit.Rule;
4550
import org.junit.Test;
@@ -147,9 +152,41 @@ public void testExecProcess() throws IOException, InterruptedException {
147152
assertEquals(0, process.exitValue());
148153
}
149154

155+
@Test
156+
public void testDefaultUnhandledError() throws IOException, InterruptedException {
157+
final Throwable throwable = mock(Throwable.class);
158+
final ExecProcess process = new ExecProcess(client);
159+
process.getHandler().open("wss", null);
160+
161+
process.getHandler().failure(throwable);
162+
process.waitFor();
163+
164+
verify(throwable, times(1)).printStackTrace();
165+
assertEquals(false, process.isAlive());
166+
assertEquals(-1975219, process.exitValue());
167+
}
168+
169+
@Test
170+
public void testCustomUnhandledError() throws IOException, InterruptedException {
171+
final Consumer<Throwable> consumer = mock(Consumer.class);
172+
final Throwable throwable = mock(Throwable.class);
173+
final ExecProcess process = new ExecProcess(client, consumer);
174+
process.getHandler().open("wss", null);
175+
176+
process.getHandler().failure(throwable);
177+
process.waitFor();
178+
179+
verify(throwable, times(0)).printStackTrace();
180+
verify(consumer, times(1)).accept(throwable);
181+
assertEquals(false, process.isAlive());
182+
assertEquals(-1975219, process.exitValue());
183+
}
184+
150185
@Test
151186
public void testUrl() throws IOException, ApiException, InterruptedException {
187+
final Consumer<Throwable> consumer = mock(Consumer.class);
152188
Exec exec = new Exec(client);
189+
exec.setOnUnhandledError(consumer);
153190

154191
V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name(podName).namespace(namespace));
155192

@@ -202,6 +239,7 @@ public void testUrl() throws IOException, ApiException, InterruptedException {
202239
.withQueryParam("command", equalTo("cmd")));
203240

204241
assertEquals(-1975219, p.exitValue());
242+
verify(consumer, times(1)).accept(any(Throwable.class));
205243
}
206244

207245
@Test

0 commit comments

Comments
 (0)