diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 6215c20179..99bb280ae4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -28,8 +28,6 @@ import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Secret; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.CustomResource; @@ -46,6 +44,8 @@ import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; +import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSIONS; + /** An interface from which to retrieve configuration information. */ public interface ConfigurationService { @@ -448,61 +448,16 @@ default Set> defaultNonSSAResource() { } /** - * If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect - * events from its own updates of dependent resources and then filter them. + * If the event logic can compare resourceVersions. * - *

Disable this if you want to react to your own dependent resource updates + *

Enabled by default as Kubernetes does support this interpretation of resourceVersions. + * Disable only if your api server provides non comparable resource versions. * - * @return if special annotation should be used for dependent resource to filter events - * @since 4.5.0 + * @return if resource versions are comparable + * @since 5.3.0 */ - default boolean previousAnnotationForDependentResourcesEventFiltering() { - return true; - } - - /** - * For dependent resources, the framework can add an annotation to filter out events resulting - * directly from the framework's operation. There are, however, some resources that do not follow - * the Kubernetes API conventions that changes in metadata should not increase the generation of - * the resource (as recorded in the {@code generation} field of the resource's {@code metadata}). - * For these resources, this convention is not respected and results in a new event for the - * framework to process. If that particular case is not handled correctly in the resource matcher, - * the framework will consider that the resource doesn't match the desired state and therefore - * triggers an update, which in turn, will re-add the annotation, thus starting the loop again, - * infinitely. - * - *

As a workaround, we automatically skip adding previous annotation for those well-known - * resources. Note that if you are sure that the matcher works for your use case, and it should in - * most instances, you can remove the resource type from the blocklist. - * - *

The consequence of adding a resource type to the set is that the framework will not use - * event filtering to prevent events, initiated by changes made by the framework itself as a - * result of its processing of dependent resources, to trigger the associated reconciler again. - * - *

Note that this method only takes effect if annotating dependent resources to prevent - * dependent resources events from triggering the associated reconciler again is activated as - * controlled by {@link #previousAnnotationForDependentResourcesEventFiltering()} - * - * @return a Set of resource classes where the previous version annotation won't be used. - */ - default Set> withPreviousAnnotationForDependentResourcesBlocklist() { - return Set.of(Deployment.class, StatefulSet.class); - } - - /** - * If the event logic should parse the resourceVersion to determine the ordering of dependent - * resource events. This is typically not needed. - * - *

Disabled by default as Kubernetes does not support, and discourages, this interpretation of - * resourceVersions. Enable only if your api server event processing seems to lag the operator - * logic, and you want to further minimize the amount of work done / updates issued by the - * operator. - * - * @return if resource version should be parsed (as integer) - * @since 4.5.0 - */ - default boolean parseResourceVersionsForEventFilteringAndCaching() { - return false; + default boolean comparableResourceVersions() { + return DEFAULT_COMPARABLE_RESOURCE_VERSIONS; } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 3d29bb6589..81a5428044 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -51,11 +51,9 @@ public class ConfigurationServiceOverrider { private Duration reconciliationTerminationTimeout; private Boolean ssaBasedCreateUpdateMatchForDependentResources; private Set> defaultNonSSAResource; - private Boolean previousAnnotationForDependentResources; - private Boolean parseResourceVersions; + private Boolean comparableResourceVersions; private Boolean useSSAToPatchPrimaryResource; private Boolean cloneSecondaryResourcesWhenGettingFromCache; - private Set> previousAnnotationUsageBlocklist; @SuppressWarnings("rawtypes") private DependentResourceFactory dependentResourceFactory; @@ -168,28 +166,23 @@ public ConfigurationServiceOverrider withDefaultNonSSAResource( return this; } - public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(boolean value) { - this.previousAnnotationForDependentResources = value; - return this; - } - /** * @param value true if internal algorithms can use metadata.resourceVersion as a numeric value. * @return this */ - public ConfigurationServiceOverrider withParseResourceVersions(boolean value) { - this.parseResourceVersions = value; + public ConfigurationServiceOverrider withComparableResourceVersions(boolean value) { + this.comparableResourceVersions = value; return this; } /** - * @deprecated use withParseResourceVersions + * @deprecated use withComparableResourceVersions * @param value true if internal algorithms can use metadata.resourceVersion as a numeric value. * @return this */ @Deprecated(forRemoval = true) - public ConfigurationServiceOverrider wihtParseResourceVersions(boolean value) { - this.parseResourceVersions = value; + public ConfigurationServiceOverrider withParseResourceVersions(boolean value) { + this.comparableResourceVersions = value; return this; } @@ -204,12 +197,6 @@ public ConfigurationServiceOverrider withCloneSecondaryResourcesWhenGettingFromC return this; } - public ConfigurationServiceOverrider withPreviousAnnotationForDependentResourcesBlocklist( - Set> blocklist) { - this.previousAnnotationUsageBlocklist = blocklist; - return this; - } - public ConfigurationService build() { return new BaseConfigurationService(original.getVersion(), cloner, client) { @Override @@ -331,20 +318,6 @@ public Set> defaultNonSSAResources() { defaultNonSSAResource, ConfigurationService::defaultNonSSAResources); } - @Override - public boolean previousAnnotationForDependentResourcesEventFiltering() { - return overriddenValueOrDefault( - previousAnnotationForDependentResources, - ConfigurationService::previousAnnotationForDependentResourcesEventFiltering); - } - - @Override - public boolean parseResourceVersionsForEventFilteringAndCaching() { - return overriddenValueOrDefault( - parseResourceVersions, - ConfigurationService::parseResourceVersionsForEventFilteringAndCaching); - } - @Override public boolean useSSAToPatchPrimaryResource() { return overriddenValueOrDefault( @@ -359,11 +332,9 @@ public boolean cloneSecondaryResourcesWhenGettingFromCache() { } @Override - public Set> - withPreviousAnnotationForDependentResourcesBlocklist() { + public boolean comparableResourceVersions() { return overriddenValueOrDefault( - previousAnnotationUsageBlocklist, - ConfigurationService::withPreviousAnnotationForDependentResourcesBlocklist); + comparableResourceVersions, ConfigurationService::comparableResourceVersions); } }; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java index bca605a41c..c6ea21f0c0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java @@ -33,6 +33,7 @@ import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSIONS; import static io.javaoperatorsdk.operator.api.reconciler.Constants.SAME_AS_CONTROLLER_NAMESPACES_SET; import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACE_SET; import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE_SET; @@ -96,18 +97,21 @@ class DefaultInformerEventSourceConfiguration private final GroupVersionKind groupVersionKind; private final InformerConfiguration informerConfig; private final KubernetesClient kubernetesClient; + private final boolean comparableResourceVersions; protected DefaultInformerEventSourceConfiguration( GroupVersionKind groupVersionKind, PrimaryToSecondaryMapper primaryToSecondaryMapper, SecondaryToPrimaryMapper secondaryToPrimaryMapper, InformerConfiguration informerConfig, - KubernetesClient kubernetesClient) { + KubernetesClient kubernetesClient, + boolean comparableResourceVersions) { this.informerConfig = Objects.requireNonNull(informerConfig); this.groupVersionKind = groupVersionKind; this.primaryToSecondaryMapper = primaryToSecondaryMapper; this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; this.kubernetesClient = kubernetesClient; + this.comparableResourceVersions = comparableResourceVersions; } @Override @@ -135,6 +139,11 @@ public Optional getGroupVersionKind() { public Optional getKubernetesClient() { return Optional.ofNullable(kubernetesClient); } + + @Override + public boolean comparableResourceVersions() { + return this.comparableResourceVersions; + } } @SuppressWarnings({"unused", "UnusedReturnValue"}) @@ -148,6 +157,7 @@ class Builder { private PrimaryToSecondaryMapper primaryToSecondaryMapper; private SecondaryToPrimaryMapper secondaryToPrimaryMapper; private KubernetesClient kubernetesClient; + private boolean comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSIONS; private Builder(Class resourceClass, Class primaryResourceClass) { this(resourceClass, primaryResourceClass, null); @@ -285,6 +295,11 @@ public Builder withFieldSelector(FieldSelector fieldSelector) { return this; } + public Builder withComparableResourceVersions(boolean comparableResourceVersions) { + this.comparableResourceVersions = comparableResourceVersions; + return this; + } + public void updateFrom(InformerConfiguration informerConfig) { if (informerConfig != null) { final var informerConfigName = informerConfig.getName(); @@ -324,7 +339,10 @@ public InformerEventSourceConfiguration build() { HasMetadata.getKind(primaryResourceClass), false)), config.build(), - kubernetesClient); + kubernetesClient, + comparableResourceVersions); } } + + boolean comparableResourceVersions(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java index 052b4d8c44..ed975d71ef 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java @@ -41,6 +41,7 @@ public final class Constants { public static final String RESOURCE_GVK_KEY = "josdk.resource.gvk"; public static final String CONTROLLER_NAME = "controller.name"; public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true; + public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSIONS = true; private Constants() {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index b4b3405ec4..72f0c8da36 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -451,6 +451,11 @@ public static

P addFinalizerWithSSA( } } + public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) { + return compareResourceVersions( + h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion()); + } + public static int compareResourceVersions(String v1, String v2) { var v1Length = v1.length(); if (v1Length == 0) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index efe25b9a43..1189642d35 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -55,7 +55,6 @@ public abstract class KubernetesDependentResource kubernetesDependentResourceConfig; private volatile Boolean useSSA; - private volatile Boolean usePreviousAnnotationForEventFiltering; public KubernetesDependentResource() {} @@ -72,6 +71,27 @@ public void configureWith(KubernetesDependentResourceConfig config) { this.kubernetesDependentResourceConfig = config; } + @Override + protected R handleCreate(R desired, P primary, Context

context) { + return eventSource() + .orElseThrow() + .updateAndCacheResource( + desired, + context, + toCreate -> KubernetesDependentResource.super.handleCreate(toCreate, primary, context)); + } + + @Override + protected R handleUpdate(R actual, R desired, P primary, Context

context) { + return eventSource() + .orElseThrow() + .updateAndCacheResource( + desired, + context, + toUpdate -> + KubernetesDependentResource.super.handleUpdate(actual, toUpdate, primary, context)); + } + @SuppressWarnings("unused") public R create(R desired, P primary, Context

context) { if (useSSA(context)) { @@ -158,14 +178,6 @@ protected void addMetadata( } else { annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY); } - } else if (usePreviousAnnotation(context)) { // set a new one - eventSource() - .orElseThrow() - .addPreviousAnnotation( - Optional.ofNullable(actualResource) - .map(r -> r.getMetadata().getResourceVersion()) - .orElse(null), - target); } addReferenceHandlingMetadata(target, primary); } @@ -181,22 +193,6 @@ protected boolean useSSA(Context

context) { return useSSA; } - private boolean usePreviousAnnotation(Context

context) { - if (usePreviousAnnotationForEventFiltering == null) { - usePreviousAnnotationForEventFiltering = - context - .getControllerConfiguration() - .getConfigurationService() - .previousAnnotationForDependentResourcesEventFiltering() - && !context - .getControllerConfiguration() - .getConfigurationService() - .withPreviousAnnotationForDependentResourcesBlocklist() - .contains(this.resourceType()); - } - return usePreviousAnnotationForEventFiltering; - } - @Override protected void handleDelete(P primary, R secondary, Context

context) { if (secondary != null) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index b7a6406e20..59d86efe48 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -47,7 +47,11 @@ public class ControllerEventSource @SuppressWarnings({"unchecked", "rawtypes"}) public ControllerEventSource(Controller controller) { - super(NAME, controller.getCRClient(), controller.getConfiguration(), false); + super( + NAME, + controller.getCRClient(), + controller.getConfiguration(), + controller.getConfiguration().getConfigurationService().comparableResourceVersions()); this.controller = controller; final var config = controller.getConfiguration(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index ec11db25f4..d46dd0669a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -17,7 +17,7 @@ import java.util.Optional; import java.util.Set; -import java.util.UUID; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -35,6 +35,8 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; +import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSIONS; + /** * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since * this is built on top of Fabric8 client Informers, it also supports caching resources using @@ -78,28 +80,24 @@ public class InformerEventSource // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; - private final String id = UUID.randomUUID().toString(); public InformerEventSource( InformerEventSourceConfiguration configuration, EventSourceContext

context) { this( configuration, configuration.getKubernetesClient().orElse(context.getClient()), - context - .getControllerConfiguration() - .getConfigurationService() - .parseResourceVersionsForEventFilteringAndCaching()); + configuration.comparableResourceVersions()); } InformerEventSource(InformerEventSourceConfiguration configuration, KubernetesClient client) { - this(configuration, client, false); + this(configuration, client, DEFAULT_COMPARABLE_RESOURCE_VERSIONS); } @SuppressWarnings({"unchecked", "rawtypes"}) private InformerEventSource( InformerEventSourceConfiguration configuration, KubernetesClient client, - boolean parseResourceVersions) { + boolean comparableResourceVersions) { super( configuration.name(), configuration @@ -107,7 +105,7 @@ private InformerEventSource( .map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind())) .orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())), configuration, - parseResourceVersions); + comparableResourceVersions); // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (useSecondaryToPrimaryIndex()) { @@ -125,6 +123,22 @@ private InformerEventSource( genericFilter = informerConfig.getGenericFilter(); } + public R updateAndCacheResource( + R resourceToUpdate, Context context, UnaryOperator updateMethod) { + ResourceID id = ResourceID.fromResource(resourceToUpdate); + if (log.isDebugEnabled()) { + log.debug("Update and cache: {}", id); + } + try { + temporaryResourceCache.startModifying(id); + var updated = updateMethod.apply(resourceToUpdate); + handleRecentResourceUpdate(id, updated, resourceToUpdate); + return updated; + } finally { + temporaryResourceCache.doneModifying(id); + } + } + @Override public void onAdd(R newResource) { if (log.isDebugEnabled()) { @@ -134,9 +148,7 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - primaryToSecondaryIndex.onAddOrUpdate(newResource); - onAddOrUpdate( - Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource)); + onAddOrUpdate(Operation.ADD, newResource, null); } @Override @@ -149,16 +161,11 @@ public void onUpdate(R oldObject, R newObject) { newObject.getMetadata().getResourceVersion(), oldObject.getMetadata().getResourceVersion()); } - primaryToSecondaryIndex.onAddOrUpdate(newObject); - onAddOrUpdate( - Operation.UPDATE, - newObject, - oldObject, - () -> InformerEventSource.super.onUpdate(oldObject, newObject)); + onAddOrUpdate(Operation.UPDATE, newObject, oldObject); } @Override - public void onDelete(R resource, boolean b) { + public synchronized void onDelete(R resource, boolean b) { if (log.isDebugEnabled()) { log.debug( "On delete event received for resource id: {} type: {}", @@ -180,68 +187,28 @@ public synchronized void start() { manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate); } - private synchronized void onAddOrUpdate( - Operation operation, R newObject, R oldObject, Runnable superOnOp) { + private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) { + primaryToSecondaryIndex.onAddOrUpdate(newObject); var resourceID = ResourceID.fromResource(newObject); - if (canSkipEvent(newObject, oldObject, resourceID)) { + if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" + " ID: {}", operation, ResourceID.fromResource(newObject)); - superOnOp.run(); + } else if (eventAcceptedByFilter(operation, newObject, oldObject)) { + log.debug( + "Propagating event for {}, resource with same version not result of a reconciliation." + + " Resource ID: {}", + operation, + resourceID); + propagateEvent(newObject); } else { - superOnOp.run(); - if (eventAcceptedByFilter(operation, newObject, oldObject)) { - log.debug( - "Propagating event for {}, resource with same version not result of a reconciliation." - + " Resource ID: {}", - operation, - resourceID); - propagateEvent(newObject); - } else { - log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); - } + log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); } } - private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) { - var res = temporaryResourceCache.getResourceFromCache(resourceID); - if (res.isEmpty()) { - return isEventKnownFromAnnotation(newObject, oldObject); - } - boolean resVersionsEqual = - newObject - .getMetadata() - .getResourceVersion() - .equals(res.get().getMetadata().getResourceVersion()); - log.debug( - "Resource found in temporal cache for id: {} resource versions equal: {}", - resourceID, - resVersionsEqual); - return resVersionsEqual - || temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject); - } - - private boolean isEventKnownFromAnnotation(R newObject, R oldObject) { - String previous = newObject.getMetadata().getAnnotations().get(PREVIOUS_ANNOTATION_KEY); - boolean known = false; - if (previous != null) { - String[] parts = previous.split(","); - if (id.equals(parts[0])) { - if (oldObject == null && parts.length == 1) { - known = true; - } else if (oldObject != null - && parts.length == 2 - && oldObject.getMetadata().getResourceVersion().equals(parts[1])) { - known = true; - } - } - } - return known; - } - private void propagateEvent(R object) { var primaryResourceIdSet = configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object); @@ -289,23 +256,19 @@ public Set getSecondaryResources(P primary) { } @Override - public synchronized void handleRecentResourceUpdate( + public void handleRecentResourceUpdate( ResourceID resourceID, R resource, R previousVersionOfResource) { handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource); } @Override - public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { + public void handleRecentResourceCreate(ResourceID resourceID, R resource) { handleRecentCreateOrUpdate(Operation.ADD, resource, null); } private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { primaryToSecondaryIndex.onAddOrUpdate(newResource); - temporaryResourceCache.putResource( - newResource, - Optional.ofNullable(oldResource) - .map(r -> r.getMetadata().getResourceVersion()) - .orElse(null)); + temporaryResourceCache.putResource(newResource); } private boolean useSecondaryToPrimaryIndex() { @@ -333,22 +296,6 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) { && (genericFilter == null || genericFilter.accept(resource)); } - /** - * Add an annotation to the resource so that the subsequent will be omitted - * - * @param resourceVersion null if there is no prior version - * @param target mutable resource that will be returned - */ - public R addPreviousAnnotation(String resourceVersion, R target) { - target - .getMetadata() - .getAnnotations() - .put( - PREVIOUS_ANNOTATION_KEY, - id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse("")); - return target; - } - private enum Operation { ADD, UPDATE diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 07db980fcd..096c201c12 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -156,6 +156,10 @@ public Optional get(ResourceID resourceID) { return Optional.ofNullable(cache.getByKey(getKey(resourceID))); } + public String getLastSyncResourceVersion() { + return this.informer.lastSyncResourceVersion(); + } + private String getKey(ResourceID resourceID) { return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 2679918b60..af30617d92 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -34,6 +34,7 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.Informable; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; @@ -55,7 +56,7 @@ public abstract class ManagedInformerEventSource< private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); private InformerManager cache; - private final boolean parseResourceVersions; + private final boolean comparableResourceVersions; private ControllerConfiguration controllerConfiguration; private final C configuration; private final Map>> indexers = new HashMap<>(); @@ -63,9 +64,9 @@ public abstract class ManagedInformerEventSource< protected MixedOperation client; protected ManagedInformerEventSource( - String name, MixedOperation client, C configuration, boolean parseResourceVersions) { + String name, MixedOperation client, C configuration, boolean comparableResourceVersions) { super(configuration.getResourceClass(), name); - this.parseResourceVersions = parseResourceVersions; + this.comparableResourceVersions = comparableResourceVersions; this.client = client; this.configuration = configuration; } @@ -102,7 +103,7 @@ public synchronized void start() { if (isRunning()) { return; } - temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions); + temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions); this.cache = new InformerManager<>(client, configuration, this); cache.setControllerConfiguration(controllerConfiguration); cache.addIndexers(indexers); @@ -122,30 +123,34 @@ public synchronized void stop() { @Override public void handleRecentResourceUpdate( ResourceID resourceID, R resource, R previousVersionOfResource) { - temporaryResourceCache.putResource( - resource, previousVersionOfResource.getMetadata().getResourceVersion()); + temporaryResourceCache.putResource(resource); } @Override public void handleRecentResourceCreate(ResourceID resourceID, R resource) { - temporaryResourceCache.putAddedResource(resource); + temporaryResourceCache.putResource(resource); } @Override public Optional get(ResourceID resourceID) { + var res = cache.get(resourceID); Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); - if (resource.isPresent()) { - log.debug("Resource found in temporary cache for Resource ID: {}", resourceID); + if (comparableResourceVersions + && resource.isPresent() + && res.filter( + r -> + PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow()) + > 0) + .isEmpty()) { + log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID); return resource; - } else { - log.debug( - "Resource not found in temporary cache reading it from informer cache," - + " for Resource ID: {}", - resourceID); - var res = cache.get(resourceID); - log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID); - return res; } + log.debug( + "Resource not found, or older, in temporary cache. Found in informer cache {}, for" + + " Resource ID: {}", + res.isPresent(), + resourceID); + return res; } @SuppressWarnings("unused") diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 06226ae4ba..d918be447d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -15,16 +15,16 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -33,157 +33,152 @@ * a create or update is executed the subsequent getResource operation might not return the * up-to-date resource from informer cache, since it is not received yet. * - *

The idea of the solution is, that since an update (for create is simpler) was done - * successfully, and optimistic locking is in place, there were no other operations between reading - * the resource from the cache and the actual update. So when the new resource is stored in the - * temporal cache only if the informer still has the previous resource version, from before the - * update. If not, that means there were already updates on the cache (either by the actual update - * from DependentResource or other) so the resource does not needs to be cached. Subsequently if - * event received from the informer, it means that the cache of the informer was updated, so it - * already contains a more fresh version of the resource. + *

Since an update (for create is simpler) was done successfully we can temporarily track that + * resource if its version is later than the events we've processed. We then know that we can skip + * all events that have the same resource version or earlier than the tracked resource. Once we + * process an event that has the same resource version or later, then we know the tracked resource + * can be removed. + * + *

In some cases it is possible for the informer to deliver events prior to the attempt to put + * the resource in the temporal cache. The startModifying/doneModifying methods are used to pause + * event delivery to ensure that temporal cache recognizes the put entry as an event that can be + * skipped. + * + *

If comparable resource versions are disabled, then this cache is effectively disabled. * * @param resource to cache. */ public class TemporaryResourceCache { - static class ExpirationCache { - private final LinkedHashMap cache; - private final int ttlMs; - - public ExpirationCache(int maxEntries, int ttlMs) { - this.ttlMs = ttlMs; - this.cache = - new LinkedHashMap<>() { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxEntries; - } - }; - } + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); - public void add(K key) { - clean(); - cache.putIfAbsent(key, System.currentTimeMillis()); - } + private final Map cache = new ConcurrentHashMap<>(); + private final boolean comparableResourceVersions; + private final Map activelyModifying = new ConcurrentHashMap<>(); + private String latestResourceVersion; - public boolean contains(K key) { - clean(); - return cache.get(key) != null; - } + public TemporaryResourceCache(boolean comparableResourceVersions) { + this.comparableResourceVersions = comparableResourceVersions; + } - void clean() { - if (!cache.isEmpty()) { - long currentTimeMillis = System.currentTimeMillis(); - var iter = cache.entrySet().iterator(); - // the order will already be from oldest to newest, clean a fixed number of entries to - // amortize the cost amongst multiple calls - for (int i = 0; i < 10 && iter.hasNext(); i++) { - var entry = iter.next(); - if (currentTimeMillis - entry.getValue() > ttlMs) { - iter.remove(); - } - } - } + public void startModifying(ResourceID id) { + if (!comparableResourceVersions) { + return; } + activelyModifying + .compute( + id, + (ignored, lock) -> { + if (lock != null) { + throw new IllegalStateException(); // concurrent modifications to the same resource + // not allowed - this could be relaxed if needed + } + return new ReentrantLock(); + }) + .lock(); } - private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); - - private final Map cache = new ConcurrentHashMap<>(); - - // keep up to the last million deletions for up to 10 minutes - private final ExpirationCache tombstones = new ExpirationCache<>(1000000, 1200000); - private final ManagedInformerEventSource managedInformerEventSource; - private final boolean parseResourceVersions; - - public TemporaryResourceCache( - ManagedInformerEventSource managedInformerEventSource, - boolean parseResourceVersions) { - this.managedInformerEventSource = managedInformerEventSource; - this.parseResourceVersions = parseResourceVersions; + public void doneModifying(ResourceID id) { + if (!comparableResourceVersions) { + return; + } + activelyModifying.computeIfPresent( + id, + (ignored, lock) -> { + lock.unlock(); + return null; + }); } - public synchronized void onDeleteEvent(T resource, boolean unknownState) { - tombstones.add(resource.getMetadata().getUid()); + public void onDeleteEvent(T resource, boolean unknownState) { onEvent(resource, unknownState); } - public synchronized void onAddOrUpdateEvent(T resource) { - onEvent(resource, false); + /** + * @return true if the resourceVersion was already known + */ + public boolean onAddOrUpdateEvent(T resource) { + return onEvent(resource, false); } - synchronized void onEvent(T resource, boolean unknownState) { - cache.computeIfPresent( - ResourceID.fromResource(resource), - (id, cached) -> - (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); + private boolean onEvent(T resource, boolean unknownState) { + ReentrantLock lock = activelyModifying.get(ResourceID.fromResource(resource)); + if (lock != null) { + lock.lock(); // wait for the modification to finish + lock.unlock(); // simply unlock as the event is guaranteed after the modification + } + boolean[] known = new boolean[1]; + synchronized (this) { + if (!unknownState) { + latestResourceVersion = resource.getMetadata().getResourceVersion(); + } + cache.computeIfPresent( + ResourceID.fromResource(resource), + (id, cached) -> { + boolean remove = unknownState; + if (!unknownState) { + int comp = PrimaryUpdateAndCacheUtils.compareResourceVersions(resource, cached); + if (comp >= 0) { + remove = true; + } + if (comp <= 0) { + known[0] = true; + } + } + if (remove) { + return null; + } + return cached; + }); + return known[0]; + } } - public synchronized void putAddedResource(T newResource) { - putResource(newResource, null); - } + /** put the item into the cache if it's for a later state than what has already been observed. */ + public synchronized void putResource(T newResource) { + if (!comparableResourceVersions) { + return; + } - /** - * put the item into the cache if the previousResourceVersion matches the current state. If not - * the currently cached item is removed. - * - * @param previousResourceVersion null indicates an add - */ - public synchronized void putResource(T newResource, String previousResourceVersion) { var resourceId = ResourceID.fromResource(newResource); - var cachedResource = managedInformerEventSource.get(resourceId).orElse(null); - - boolean moveAhead = false; - if (previousResourceVersion == null && cachedResource == null) { - if (tombstones.contains(newResource.getMetadata().getUid())) { - log.debug( - "Won't resurrect uid {} for resource id: {}", - newResource.getMetadata().getUid(), - resourceId); - return; - } - // we can skip further checks as this is a simple add and there's no previous entry to - // consider - moveAhead = true; + + if (newResource.getMetadata().getResourceVersion() == null) { + log.warn( + "Resource {}: with no resourceVersion put in temporary cache. This is not the expected" + + " usage pattern, only resources returned from the api server should be put in the" + + " cache.", + resourceId); + return; } - if (moveAhead - || (cachedResource != null - && (cachedResource - .getMetadata() - .getResourceVersion() - .equals(previousResourceVersion)) - || isLaterResourceVersion(resourceId, newResource, cachedResource))) { + // check against the latestResourceVersion processed by the TemporaryResourceCache + // If the resource is older, then we can safely ignore. + // + // this also prevents resurrecting recently deleted entities for which the delete event + // has already been processed + if (latestResourceVersion != null + && PrimaryUpdateAndCacheUtils.compareResourceVersions( + latestResourceVersion, newResource.getMetadata().getResourceVersion()) + > 0) { log.debug( - "Temporarily moving ahead to target version {} for resource id: {}", + "Resource {}: resourceVersion {} is not later than latest {}", + resourceId, newResource.getMetadata().getResourceVersion(), - resourceId); - cache.put(resourceId, newResource); - } else if (cache.remove(resourceId) != null) { - log.debug("Removed an obsolete resource from cache for id: {}", resourceId); + latestResourceVersion); + return; } - } - /** - * @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()} - * is enabled and the resourceVersion of newResource is numerically greater than - * cachedResource, otherwise false - */ - public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) { - try { - if (parseResourceVersions - && Long.parseLong(newResource.getMetadata().getResourceVersion()) - > Long.parseLong(cachedResource.getMetadata().getResourceVersion())) { - return true; - } - } catch (NumberFormatException e) { + // also make sure that we're later than the existing temporary entry + var cachedResource = getResourceFromCache(resourceId).orElse(null); + + if (cachedResource == null + || PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) > 0) { log.debug( - "Could not compare resourceVersions {} and {} for {}", + "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), - cachedResource.getMetadata().getResourceVersion(), resourceId); + cache.put(resourceId, newResource); } - return false; } public synchronized Optional getResourceFromCache(ResourceID resourceID) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 208d6aeaaa..f54e47304b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -94,31 +94,18 @@ public synchronized void start() {} } @Test - void skipsEventPropagationIfResourceWithSameVersionInResourceCache() { + void skipsEventPropagation() { when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(true); + informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, never()).handleEvent(any()); } - @Test - void skipsAddEventPropagationViaAnnotation() { - informerEventSource.onAdd(informerEventSource.addPreviousAnnotation(null, testDeployment())); - - verify(eventHandlerMock, never()).handleEvent(any()); - } - - @Test - void skipsUpdateEventPropagationViaAnnotation() { - informerEventSource.onUpdate( - testDeployment(), informerEventSource.addPreviousAnnotation("1", testDeployment())); - - verify(eventHandlerMock, never()).handleEvent(any()); - } - @Test void processEventPropagationWithoutAnnotation() { informerEventSource.onUpdate(testDeployment(), testDeployment()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index e3dc2c82e4..4b12148015 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -16,10 +16,10 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Map; -import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,49 +27,40 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; class TemporaryPrimaryResourceCacheTest { public static final String RESOURCE_VERSION = "2"; - @SuppressWarnings("unchecked") - private InformerEventSource informerEventSource; - private TemporaryResourceCache temporaryResourceCache; @BeforeEach void setup() { - informerEventSource = mock(InformerEventSource.class); - temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); + temporaryResourceCache = new TemporaryResourceCache<>(true); } @Test void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() { var testResource = testResource(); var prevTestResource = testResource(); - prevTestResource.getMetadata().setResourceVersion("0"); - when(informerEventSource.get(any())).thenReturn(Optional.of(prevTestResource)); + prevTestResource.getMetadata().setResourceVersion("1"); - temporaryResourceCache.putResource(testResource, "0"); + temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isPresent(); } @Test - void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() { + void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() { var testResource = testResource(); - var informerCachedResource = testResource(); - informerCachedResource.getMetadata().setResourceVersion("x"); - when(informerEventSource.get(any())).thenReturn(Optional.of(informerCachedResource)); - temporaryResourceCache.putResource(testResource, "0"); + temporaryResourceCache.onAddOrUpdateEvent( + testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build()); + + temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isNotPresent(); @@ -78,9 +69,8 @@ void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() { @Test void addOperationAddsTheResourceIfInformerCacheStillEmpty() { var testResource = testResource(); - when(informerEventSource.get(any())).thenReturn(Optional.empty()); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isPresent(); @@ -89,46 +79,79 @@ void addOperationAddsTheResourceIfInformerCacheStillEmpty() { @Test void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() { var testResource = testResource(); - when(informerEventSource.get(any())).thenReturn(Optional.of(testResource())); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource(testResource); + + temporaryResourceCache.putResource( + new ConfigMapBuilder(testResource) + .editMetadata() + .withResourceVersion("1") + .endMetadata() + .build()); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); - assertThat(cached).isNotPresent(); + assertThat(cached.orElseThrow().getMetadata().getResourceVersion()).isEqualTo(RESOURCE_VERSION); } @Test void removesResourceFromCache() { ConfigMap testResource = propagateTestResourceToCache(); - temporaryResourceCache.onAddOrUpdateEvent(testResource()); + temporaryResourceCache.onAddOrUpdateEvent( + new ConfigMapBuilder(testResource) + .editMetadata() + .withResourceVersion("3") + .endMetadata() + .build()); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isNotPresent(); } @Test - void resourceVersionParsing() { - this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); + void nonComparableResourceVersionsDisables() { + this.temporaryResourceCache = new TemporaryResourceCache<>(false); - ConfigMap testResource = propagateTestResourceToCache(); + this.temporaryResourceCache.putResource(testResource()); - // an event with a newer version will not remove - temporaryResourceCache.onAddOrUpdateEvent( - new ConfigMapBuilder(testResource) - .editMetadata() - .withResourceVersion("1") - .endMetadata() - .build()); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource()))) + .isEmpty(); + } - assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) - .isPresent(); + @Test + void lockedEventBeforePut() throws Exception { + var testResource = testResource(); - // anything else will remove - temporaryResourceCache.onAddOrUpdateEvent(testResource()); + temporaryResourceCache.startModifying(ResourceID.fromResource(testResource)); - assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) - .isNotPresent(); + ExecutorService ex = Executors.newSingleThreadExecutor(); + try { + var result = ex.submit(() -> temporaryResourceCache.onAddOrUpdateEvent(testResource)); + + temporaryResourceCache.putResource(testResource); + assertThat(result.isDone()).isFalse(); + temporaryResourceCache.doneModifying(ResourceID.fromResource(testResource)); + assertThat(result.get(10, TimeUnit.SECONDS)).isTrue(); + } finally { + ex.shutdownNow(); + } + } + + @Test + void putBeforeEvent() { + var testResource = testResource(); + + // first ensure an event is not known + var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + assertThat(result).isFalse(); + + var nextResource = testResource(); + nextResource.getMetadata().setResourceVersion("3"); + temporaryResourceCache.putResource(nextResource); + + // now expect an event with the matching resourceVersion to be known after the put + result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + assertThat(result).isTrue(); } @Test @@ -143,45 +166,15 @@ void rapidDeletion() { .endMetadata() .build(), false); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource(testResource); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isEmpty(); } - @Test - void expirationCacheMax() { - ExpirationCache cache = new ExpirationCache<>(2, Integer.MAX_VALUE); - - cache.add(1); - cache.add(2); - cache.add(3); - - assertThat(cache.contains(1)).isFalse(); - assertThat(cache.contains(2)).isTrue(); - assertThat(cache.contains(3)).isTrue(); - } - - @Test - void expirationCacheTtl() { - ExpirationCache cache = new ExpirationCache<>(2, 1); - - cache.add(1); - cache.add(2); - - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted( - () -> { - assertThat(cache.contains(1)).isFalse(); - assertThat(cache.contains(2)).isFalse(); - }); - } - private ConfigMap propagateTestResourceToCache() { var testResource = testResource(); - when(informerEventSource.get(any())).thenReturn(Optional.empty()); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource(testResource); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isPresent(); return testResource; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java similarity index 91% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java index f80c75b84b..f683e439c9 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java @@ -20,14 +20,13 @@ import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; -class PreviousAnnotationDisabledIT { +class ComparableResourceVersionsDisabledIT { @RegisterExtension LocallyRunOperatorExtension operator = LocallyRunOperatorExtension.builder() .withReconciler(new CreateUpdateEventFilterTestReconciler()) - .withConfigurationService( - overrider -> overrider.withPreviousAnnotationForDependentResources(false)) + .withConfigurationService(overrider -> overrider.withComparableResourceVersions(false)) .build(); @Test diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java index d1a03b0c8e..e102addc51 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java @@ -104,13 +104,15 @@ private void createExternalResource( .withData(Map.of(ID_KEY, createdResource.getId())) .build(); configMap.addOwnerReference(resource); - context.getClient().configMaps().resource(configMap).create(); var primaryID = ResourceID.fromResource(resource); // Making sure that the created resources are in the cache for the next reconciliation. // This is critical in this case, since on next reconciliation if it would not be in the cache // it would be created again. - configMapEventSource.handleRecentResourceCreate(primaryID, configMap); + configMapEventSource.updateAndCacheResource( + configMap, + context, + toCreate -> context.getClient().configMaps().resource(toCreate).create()); externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource); } @@ -128,6 +130,7 @@ public DeleteControl cleanup( return DeleteControl.defaultDelete(); } + @Override public int getNumberOfExecutions() { return numberOfExecutions.get(); }