Skip to content

fix: primary cache utils mechanism #2814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.function.UnaryOperator;

import org.slf4j.Logger;
Expand All @@ -25,6 +27,8 @@
public class PrimaryUpdateAndCacheUtils {

public static final int DEFAULT_MAX_RETRY = 10;
public static final int DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS = 10000;
public static final int DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS = 50;

private PrimaryUpdateAndCacheUtils() {}

Expand Down Expand Up @@ -90,8 +94,10 @@ public static <P extends HasMetadata> P ssaPatchStatusAndCacheResource(
}

/**
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator,
* int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, int,
* long,long)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY} and
* default cache maximum polling time and period as defined, respectively by {@link
* #DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS} and {@link #DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS}.
*
* @param resourceToUpdate original resource to update
* @param context of reconciliation
Expand All @@ -106,7 +112,13 @@ public static <P extends HasMetadata> P updateAndCacheResource(
UnaryOperator<P> modificationFunction,
UnaryOperator<P> updateMethod) {
return updateAndCacheResource(
resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY);
resourceToUpdate,
context,
modificationFunction,
updateMethod,
DEFAULT_MAX_RETRY,
DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS,
DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS);
}

/**
Expand All @@ -124,16 +136,20 @@ public static <P extends HasMetadata> P updateAndCacheResource(
* @param modificationFunction modifications to make on primary
* @param updateMethod the update method implementation
* @param maxRetry maximum number of retries before giving up
* @param cachePollTimeoutMillis maximum amount of milliseconds to wait for the updated resource
* to appear in cache
* @param cachePollPeriodMillis cache polling period, in milliseconds
* @param <P> primary type
* @return the updated resource
*/
@SuppressWarnings("unchecked")
public static <P extends HasMetadata> P updateAndCacheResource(
P resourceToUpdate,
Context<P> context,
UnaryOperator<P> modificationFunction,
UnaryOperator<P> updateMethod,
int maxRetry) {
int maxRetry,
long cachePollTimeoutMillis,
long cachePollPeriodMillis) {

if (log.isDebugEnabled()) {
log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate));
Expand Down Expand Up @@ -180,14 +196,37 @@ public static <P extends HasMetadata> P updateAndCacheResource(
resourceToUpdate.getMetadata().getNamespace(),
e.getCode());
resourceToUpdate =
(P)
context
.getClient()
.resources(resourceToUpdate.getClass())
.inNamespace(resourceToUpdate.getMetadata().getNamespace())
.withName(resourceToUpdate.getMetadata().getName())
.get();
pollLocalCache(
context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis);
}
}
}

private static <P extends HasMetadata> P pollLocalCache(
Context<P> context, P staleResource, long timeoutMillis, long pollDelayMillis) {
try {
var resourceId = ResourceID.fromResource(staleResource);
var startTime = LocalTime.now();
final var timeoutTime = startTime.plus(timeoutMillis, ChronoUnit.MILLIS);
while (timeoutTime.isAfter(LocalTime.now())) {
log.debug("Polling cache for resource: {}", resourceId);
var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow();
if (!cachedResource
.getMetadata()
.getResourceVersion()
.equals(staleResource.getMetadata().getResourceVersion())) {
return context
.getControllerConfiguration()
.getConfigurationService()
.getResourceCloner()
.clone(cachedResource);
}
Thread.sleep(pollDelayMillis);
}
throw new OperatorException("Timeout of resource polling from cache for resource");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OperatorException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
}
var resourceId = ResourceID.fromResource(newResource);
var cachedResource =
getResourceFromCache(resourceId)
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);

boolean moveAhead = false;
if (previousResourceVersion == null && cachedResource == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.function.UnaryOperator;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.TestUtils;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
Expand All @@ -29,6 +35,7 @@ class PrimaryUpdateAndCacheUtilsTest {
Context<TestCustomResource> context = mock(Context.class);
KubernetesClient client = mock(KubernetesClient.class);
Resource resource = mock(Resource.class);
IndexedResourceCache<TestCustomResource> primaryCache = mock(IndexedResourceCache.class);

@BeforeEach
void setup() {
Expand All @@ -41,6 +48,20 @@ void setup() {
when(mixedOp.inNamespace(any())).thenReturn(mixedOp);
when(mixedOp.withName(any())).thenReturn(resource);
when(resource.get()).thenReturn(TestUtils.testCustomResource1());
when(context.getPrimaryCache()).thenReturn(primaryCache);

var controllerConfiguration = mock(ControllerConfiguration.class);
when(context.getControllerConfiguration()).thenReturn(controllerConfiguration);
var configService = mock(ConfigurationService.class);
when(controllerConfiguration.getConfigurationService()).thenReturn(configService);
when(configService.getResourceCloner())
.thenReturn(
new Cloner() {
@Override
public <R extends HasMetadata> R clone(R object) {
return new KubernetesSerialization().clone(object);
}
});
}

@Test
Expand Down Expand Up @@ -76,6 +97,10 @@ void retriesConflicts() {
when(updateOperation.apply(any()))
.thenThrow(new KubernetesClientException("", 409, null))
.thenReturn(TestUtils.testCustomResource1());
var freshResource = TestUtils.testCustomResource1();

freshResource.getMetadata().setResourceVersion("2");
when(primaryCache.get(any())).thenReturn(Optional.of(freshResource));

var updated =
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
Expand All @@ -89,15 +114,21 @@ void retriesConflicts() {
updateOperation);

assertThat(updated).isNotNull();
verify(resource, times(1)).get();
verify(primaryCache, times(1)).get(any());
}

@Test
void throwsIfRetryExhausted() {
var updateOperation = mock(UnaryOperator.class);

when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
var stubbing = when(primaryCache.get(any()));

for (int i = 0; i < DEFAULT_MAX_RETRY; i++) {
var resource = TestUtils.testCustomResource1();
resource.getMetadata().setResourceVersion("" + i);
stubbing = stubbing.thenReturn(Optional.of(resource));
}
assertThrows(
OperatorException.class,
() ->
Expand All @@ -106,6 +137,28 @@ void throwsIfRetryExhausted() {
context,
UnaryOperator.identity(),
updateOperation));
verify(resource, times(DEFAULT_MAX_RETRY)).get();
verify(primaryCache, times(DEFAULT_MAX_RETRY)).get(any());
}

@Test
void cachePollTimeouts() {
var updateOperation = mock(UnaryOperator.class);

when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
when(primaryCache.get(any())).thenReturn(Optional.of(TestUtils.testCustomResource1()));

var ex =
assertThrows(
OperatorException.class,
() ->
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
TestUtils.testCustomResource1(),
context,
UnaryOperator.identity(),
updateOperation,
2,
50L,
10L));
assertThat(ex.getMessage()).contains("Timeout");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@
@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("spwl")
public class StatusPatchCacheWithLockCustomResource
extends CustomResource<StatusPatchCacheWithLockSpec, StatusPatchCacheWithLockStatus>
implements Namespaced {}
public class StatusPatchCacheCustomResource
extends CustomResource<StatusPatchCacheSpec, StatusPatchCacheStatus> implements Namespaced {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class StatusPatchCacheWithLockIT {
public class StatusPatchCacheIT {

public static final String TEST_1 = "test1";

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(StatusPatchCacheWithLockReconciler.class)
.withReconciler(StatusPatchCacheReconciler.class)
.build();

@Test
void testStatusAlwaysUpToDate() {
var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class);
var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class);

extension.create(testResource());

Expand All @@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() {
});
}

StatusPatchCacheWithLockCustomResource testResource() {
var res = new StatusPatchCacheWithLockCustomResource();
StatusPatchCacheCustomResource testResource() {
var res = new StatusPatchCacheCustomResource();
res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build());
res.setSpec(new StatusPatchCacheWithLockSpec());
res.setSpec(new StatusPatchCacheSpec());
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

@ControllerConfiguration
public class StatusPatchCacheWithLockReconciler
implements Reconciler<StatusPatchCacheWithLockCustomResource> {
public class StatusPatchCacheReconciler implements Reconciler<StatusPatchCacheCustomResource> {

public volatile int latestValue = 0;
public volatile boolean errorPresent = false;

@Override
public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
StatusPatchCacheWithLockCustomResource resource,
Context<StatusPatchCacheWithLockCustomResource> context) {
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {

if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) {
errorPresent = true;
Expand Down Expand Up @@ -50,22 +48,20 @@ public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
}

@Override
public List<EventSource<?, StatusPatchCacheWithLockCustomResource>> prepareEventSources(
EventSourceContext<StatusPatchCacheWithLockCustomResource> context) {
public List<EventSource<?, StatusPatchCacheCustomResource>> prepareEventSources(
EventSourceContext<StatusPatchCacheCustomResource> context) {
// periodic event triggering for testing purposes
return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache()));
}

private StatusPatchCacheWithLockCustomResource createFreshCopy(
StatusPatchCacheWithLockCustomResource resource) {
var res = new StatusPatchCacheWithLockCustomResource();
private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) {
var res = new StatusPatchCacheCustomResource();
res.setMetadata(
new ObjectMetaBuilder()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.build());
res.setStatus(new StatusPatchCacheWithLockStatus());

res.setStatus(new StatusPatchCacheStatus());
return res;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.javaoperatorsdk.operator.baseapi.statuscache;

public class StatusPatchCacheWithLockSpec {
public class StatusPatchCacheSpec {

private int counter = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.statuscache;

public class StatusPatchCacheWithLockStatus {
public class StatusPatchCacheStatus {

private Integer value = 0;

public Integer getValue() {
return value;
}

public StatusPatchCacheWithLockStatus setValue(Integer value) {
public StatusPatchCacheStatus setValue(Integer value) {
this.value = value;
return this;
}
Expand Down