Skip to content

Commit

Permalink
fix: RequestConfig is propagated to derived HttpClient instances
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Nuri <[email protected]>
  • Loading branch information
manusa committed May 11, 2023
1 parent c26ec27 commit 8cf4804
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 6.7-SNAPSHOT

#### Bugs
Fix #5121: RequestConfig is propagated to derived HttpClient instances

#### Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
Expand All @@ -31,29 +32,31 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class LogWatchCallback implements LogWatch, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(LogWatchCallback.class);

private OutputStream out;
private final OutputStream out;
private WritableByteChannel outChannel;
private final OperationContext context;
private volatile InputStream output;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
private final SerialExecutor serialExecutor;

public LogWatchCallback(OutputStream out, Executor executor) {
public LogWatchCallback(OutputStream out, OperationContext context) {
this.out = out;
if (out != null) {
outChannel = Channels.newChannel(out);
}
this.serialExecutor = new SerialExecutor(executor);
this.context = context;
this.serialExecutor = new SerialExecutor(context.getExecutor());
}

@Override
Expand All @@ -71,7 +74,10 @@ private void cleanUp() {

public LogWatchCallback callAndWait(HttpClient client, URL url) {
HttpRequest request = client.newHttpRequestBuilder().url(url).build();
HttpClient clone = client.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
HttpClient clone = client.newBuilder()
.tag(Optional.ofNullable(context.getRequestConfig()).map(RequestConfigBuilder::new).orElse(new RequestConfigBuilder())
.withRequestTimeout(0).build())
.readTimeout(0, TimeUnit.MILLISECONDS).build();

if (out == null) {
// we can pass the input stream directly to the consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -72,6 +73,7 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation<T, L,
final ListOptions listOptions, final Watcher<T> watcher, final int reconnectInterval, final int reconnectLimit,
long websocketTimeout) throws MalformedURLException {
super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, () -> client.newBuilder()
.tag(new RequestConfigBuilder(baseOperation.getRequestConfig()).withRequestTimeout(0).build())
.readTimeout(websocketTimeout, TimeUnit.MILLISECONDS)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
Expand Down Expand Up @@ -48,6 +49,7 @@ public WatchHTTPManager(final HttpClient client,
super(
watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval,
() -> client.newBuilder()
.tag(new RequestConfigBuilder(baseOperation.getRequestConfig()).withRequestTimeout(0).build())
.readTimeout(0, TimeUnit.MILLISECONDS)
.forStreaming()
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.LocalPortForward;
import io.fabric8.kubernetes.client.PortForward;
import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable;
import io.fabric8.kubernetes.client.dsl.CopyOrReadable;
import io.fabric8.kubernetes.client.dsl.EphemeralContainersResource;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class PodOperationsImpl extends HasMetadataOperation<Pod, PodList, PodRes
private static final String[] EMPTY_COMMAND = { "/bin/sh", "-i" };
public static final String DEFAULT_CONTAINER_ANNOTATION_NAME = "kubectl.kubernetes.io/default-container";

static final transient Logger LOG = LoggerFactory.getLogger(PodOperationsImpl.class);
static final Logger LOG = LoggerFactory.getLogger(PodOperationsImpl.class);

private final PodOperationContext podOperationContext;

Expand Down Expand Up @@ -181,7 +182,7 @@ public LogWatch watchLog(OutputStream out) {
getContext().getReadyWaitTimeout() != null ? getContext().getReadyWaitTimeout() : DEFAULT_POD_READY_WAIT_TIMEOUT);
// Issue Pod Logs HTTP request
URL url = new URL(URLUtils.join(getResourceUrl().toString(), getContext().getLogParameters() + "&follow=true"));
final LogWatchCallback callback = new LogWatchCallback(out, this.context.getExecutor());
final LogWatchCallback callback = new LogWatchCallback(out, context);
return callback.callAndWait(httpClient, url);
} catch (IOException ioException) {
throw KubernetesClientException.launderThrowable(forOperationType("watchLog"), ioException);
Expand Down Expand Up @@ -375,7 +376,10 @@ private boolean hasEphemeralContainer(List<EphemeralContainer> containers, Strin
}

private ExecWebSocketListener setupConnectionToPod(URI uri) {
HttpClient clone = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
HttpClient clone = httpClient.newBuilder()
.tag(new RequestConfigBuilder(getRequestConfig()).withRequestTimeout(0).build())
.readTimeout(0, TimeUnit.MILLISECONDS)
.build();
ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(getContext(), this.context.getExecutor());
CompletableFuture<WebSocket> startedFuture = clone.newWebSocketBuilder()
.subprotocol("v4.channel.k8s.io")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public LogWatch watchLog(OutputStream out) {
// In case of DeploymentConfig we directly get logs at DeploymentConfig Url, but we need to wait for Pods
waitUntilDeploymentConfigPodBecomesReady(get());
URL url = getResourceLogUrl(true);
final LogWatchCallback callback = new LogWatchCallback(out, this.context.getExecutor());
final LogWatchCallback callback = new LogWatchCallback(out, context);
return callback.callAndWait(this.httpClient, url);
} catch (Throwable t) {
throw KubernetesClientException.launderThrowable(forOperationType("watchLog"), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public Typeable<Triggerable<WebHookTrigger, Void>> withSecret(String secret) {
protected Build submitToApiServer(InputStream inputStream, long contentLength) {
try {
HttpClient newClient = this.httpClient.newBuilder()
.tag(getOperationContext().getRequestConfig())
.readTimeout(getOperationContext().getTimeout(), getOperationContext().getTimeoutUnit())
.writeTimeout(getOperationContext().getTimeout(), getOperationContext().getTimeoutUnit())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public LogWatch watchLog(OutputStream out) {
// In case of Build we directly get logs at Build Url, but we need to wait for Pods
waitUntilBuildPodBecomesReady(get());
URL url = new URL(URLUtils.join(getResourceUrl().toString(), getLogParameters() + "&follow=true"));
final LogWatchCallback callback = new LogWatchCallback(out, this.context.getExecutor());
final LogWatchCallback callback = new LogWatchCallback(out, context);
return callback.callAndWait(this.httpClient, url);
} catch (IOException t) {
throw KubernetesClientException.launderThrowable(forOperationType("watchLog"), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,10 @@ public boolean supportsOpenShiftAPIGroup(String apiGroup) {
protected void setDerivedFields() {
OpenShiftConfig wrapped = OpenShiftConfig.wrap(config);
this.config = wrapped;
HttpClient.DerivedClientBuilder builder = httpClient.newBuilder().authenticatorNone();
HttpClient.DerivedClientBuilder builder = httpClient.newBuilder();
this.httpClient = builder
.authenticatorNone()
.tag(config.getRequestConfig())
.addOrReplaceInterceptor(TokenRefreshInterceptor.NAME,
new OpenShiftOAuthInterceptor(httpClient, wrapped))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private boolean setAuthHeader(BasicBuilder builder, String token) {

private CompletableFuture<String> authorize() {
HttpClient.DerivedClientBuilder builder = client.newBuilder();
builder.tag(config.getRequestConfig());
builder.addOrReplaceInterceptor(TokenRefreshInterceptor.NAME, null);
HttpClient clone = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void setUp() {
when(response.uri()).thenReturn(URI.create("https://localhost:8443/"));

when(httpClient.newBuilder()
.tag(any())
.readTimeout(anyLong(), any())
.writeTimeout(anyLong(), any())
.build()).thenReturn(httpClient);
Expand Down

0 comments on commit 8cf4804

Please sign in to comment.