Skip to content

Commit

Permalink
switching to an informer based implementation
Browse files Browse the repository at this point in the history
also fully removing the additional operation/context fields
  • Loading branch information
shawkins committed Jun 26, 2021
1 parent 7ded399 commit a8271e1
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 543 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public class KubernetesClientTimeoutException extends KubernetesClientException
private static final String KNOWS_RESOURCES_FORMAT = "Timed out waiting for [%d] milliseconds for multiple resources. %s";

private final List<HasMetadata> resourcesNotReady;

public KubernetesClientTimeoutException(String kind, String name, String namespace, long amount, TimeUnit timeUnit) {
super(String.format(RESOURCE_FORMAT, timeUnit.toMillis(amount), kind, name, namespace));
this.resourcesNotReady = Collections.emptyList();
}

public KubernetesClientTimeoutException(HasMetadata resource, long amount, TimeUnit timeUnit) {
super(String.format(RESOURCE_FORMAT, timeUnit.toMillis(amount), resource.getKind(), resource.getMetadata().getName(), resource.getMetadata().getNamespace()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

public interface Waitable<T, P> {

@Deprecated
long DEFAULT_INITIAL_BACKOFF_MILLIS = 5L;
@Deprecated
double DEFAULT_BACKOFF_MULTIPLIER = 2d;

T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.WritableOperation;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
Expand All @@ -38,6 +37,7 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.OperationInfo;
import io.fabric8.kubernetes.client.ResourceNotFoundException;
import io.fabric8.kubernetes.client.Watch;
Expand All @@ -59,7 +59,6 @@
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.URLUtils;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.client.utils.WatcherToggle;
Expand All @@ -73,11 +72,11 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -89,8 +88,6 @@
import okhttp3.HttpUrl;
import okhttp3.Request;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceList<T>, R extends Resource<T>>
extends OperationSupport
implements
Expand Down Expand Up @@ -1123,34 +1120,49 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedExcept
@Override
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit)
throws InterruptedException {

CompletableFuture<T> future = new CompletableFuture<>();

try (SharedIndexInformer<T> informer = this.createInformer(0)) {
informer.addEventHandler(new ResourceEventHandler<T>() {

void test(T obj) {
try {
if (condition.test(obj)) {
future.complete(obj);
informer.stop(); // immediately stop the watch
}
} catch (Exception e) {
future.completeExceptionally(e);
}
}

@Override
public void onAdd(T obj) {
test(obj);
}

long remainingNanosToWait = timeUnit.toNanos(amount);
while (remainingNanosToWait > 0) {

T item = fromServer().get();
if (condition.test(item)) {
return item;
}
@Override
public void onUpdate(T oldObj, T newObj) {
test(newObj);
}

final WaitForConditionWatcher<T> watcher = new WaitForConditionWatcher<>(condition);
final long startTime = System.nanoTime();
try (Watch ignored = watch(KubernetesResourceUtil.getResourceVersion(item), watcher)) {
return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WatcherException && ((WatcherException)cause).isHttpGone()) {
LOG.debug("Restarting the watch due to http gone");
remainingNanosToWait -= (System.nanoTime() - startTime);
continue;
@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
test(null);
}
throw KubernetesClientException.launderThrowable(cause);
} catch (TimeoutException e) {
break;
});
informer.run();
return future.get(amount, timeUnit);
} catch (ExecutionException e) {
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (TimeoutException e) {
T item = getItem();
if (item != null) {
throw new KubernetesClientTimeoutException(item, amount, timeUnit);
}
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
}

LOG.debug("ran out of time waiting for watcher, wait condition not met");
throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!");
}

public void setType(Class<T> type) {
Expand Down Expand Up @@ -1179,30 +1191,38 @@ public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexer

@Override
public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
// convert the name into something listable
FilterWatchListDeletable<T, L> baseOperation =
getName() == null ? this : withFields(Collections.singletonMap("metadata.name", getName()));
DefaultSharedIndexInformer<T, L> informer = createInformer(resync);
if (handler != null) {
informer.addEventHandler(handler);
}
// synchronous start list/watch must succeed in the calling thread
informer.run();
return informer;
}

private DefaultSharedIndexInformer<T, L> createInformer(long resync) {
T item = getItem();
String name = (Utils.isNotNullOrEmpty(getName()) || item != null) ? checkName(item) : null;

// use the local context / namespace
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), new ListerWatcher<T, L>() {
@Override
public L list(ListOptions params, String namespace, OperationContext context) {
return baseOperation.list(params);
// convert the name into something listable
if (name != null) {
params.setFieldSelector("metadata.name="+name);
}
return BaseOperation.this.list(params);
}

@Override
public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> watcher) {
return baseOperation.watch(params, watcher);
return BaseOperation.this.watch(params, watcher);
}
}, resync, context, Runnable::run); // just run the event notification in the websocket thread
if (handler != null) {
informer.addEventHandler(handler);
}
if (indexers != null) {
informer.addIndexers(indexers);
}
// synchronous start list/watch must succeed in the calling thread
informer.run();
return informer;
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit a8271e1

Please sign in to comment.