Skip to content

fix: startup all resource indexing #2881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private InformerEventSource(
parseResourceVersions);
// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (primaryToSecondaryMapper == null) {
if (useSecondaryToPrimaryIndex()) {
primaryToSecondaryIndex =
// The index uses the secondary to primary mapper (always present) to build the index
new DefaultPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
Expand Down Expand Up @@ -157,6 +157,14 @@ public void onDelete(R resource, boolean b) {
}
}

@Override
public synchronized void start() {
super.start();
// this makes sure that on first reconciliation all resources are
// present on the index
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
}

private synchronized void onAddOrUpdate(
Operation operation, R newObject, R oldObject, Runnable superOnOp) {
var resourceID = ResourceID.fromResource(newObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ void setup() {
.thenReturn(mock(SecondaryToPrimaryMapper.class));
when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class);

informerEventSource = new InformerEventSource<>(informerEventSourceConfiguration, clientMock);
informerEventSource =
new InformerEventSource<>(informerEventSourceConfiguration, clientMock) {
// mocking start
@Override
public synchronized void start() {}
};

var mockControllerConfig = mock(ControllerConfiguration.class);
when(mockControllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public <T extends HasMetadata> T create(T resource) {
return kubernetesClient.resource(resource).inNamespace(namespace).create();
}

public <T extends HasMetadata> T serverSideApply(T resource) {
return kubernetesClient.resource(resource).inNamespace(namespace).serverSideApply();
}

public <T extends HasMetadata> T replace(T resource) {
return kubernetesClient.resource(resource).inNamespace(namespace).replace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class LocallyRunOperatorExtension extends AbstractOperatorExtension {
private final List<Class<? extends CustomResource>> additionalCustomResourceDefinitions;
private final Map<Reconciler, RegisteredController> registeredControllers;
private final Map<String, String> crdMappings;
private final Consumer<LocallyRunOperatorExtension> beforeStartHook;

private LocallyRunOperatorExtension(
List<ReconcilerSpec> reconcilers,
Expand All @@ -68,7 +69,8 @@ private LocallyRunOperatorExtension(
Consumer<ConfigurationServiceOverrider> configurationServiceOverrider,
Function<ExtensionContext, String> namespaceNameSupplier,
Function<ExtensionContext, String> perClassNamespaceNameSupplier,
List<String> additionalCrds) {
List<String> additionalCrds,
Consumer<LocallyRunOperatorExtension> beforeStartHook) {
super(
infrastructure,
infrastructureTimeout,
Expand All @@ -82,6 +84,7 @@ private LocallyRunOperatorExtension(
this.portForwards = portForwards;
this.localPortForwards = new ArrayList<>(portForwards.size());
this.additionalCustomResourceDefinitions = additionalCustomResourceDefinitions;
this.beforeStartHook = beforeStartHook;
configurationServiceOverrider =
configurationServiceOverrider != null
? configurationServiceOverrider.andThen(
Expand Down Expand Up @@ -298,6 +301,10 @@ protected void before(ExtensionContext context) {
});
crdMappings.clear();

if (beforeStartHook != null) {
beforeStartHook.accept(this);
}

LOGGER.debug("Starting the operator locally");
this.operator.start();
}
Expand Down Expand Up @@ -356,6 +363,7 @@ public static class Builder extends AbstractBuilder<Builder> {
private final List<PortForwardSpec> portForwards;
private final List<Class<? extends CustomResource>> additionalCustomResourceDefinitions;
private final List<String> additionalCRDs = new ArrayList<>();
private Consumer<LocallyRunOperatorExtension> beforeStartHook;
private KubernetesClient kubernetesClient;

protected Builder() {
Expand Down Expand Up @@ -424,6 +432,15 @@ public Builder withAdditionalCRD(String... paths) {
return this;
}

/**
* Used to initialize resources when the namespace is generated but the operator is not started
* yet.
*/
public Builder withBeforeStartHook(Consumer<LocallyRunOperatorExtension> beforeStartHook) {
this.beforeStartHook = beforeStartHook;
return this;
}

public LocallyRunOperatorExtension build() {
return new LocallyRunOperatorExtension(
reconcilers,
Expand All @@ -438,7 +455,8 @@ public LocallyRunOperatorExtension build() {
configurationServiceOverrider,
namespaceNameSupplier,
perClassNamespaceNameSupplier,
additionalCRDs);
additionalCRDs,
beforeStartHook);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("ssac")
public class StartupSecondaryAccessCustomResource extends CustomResource<Void, Void>
implements Namespaced {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess;

import java.util.Map;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;

import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessReconciler.LABEL_KEY;
import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessReconciler.LABEL_VALUE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class StartupSecondaryAccessIT {

public static final int SECONDARY_NUMBER = 200;

@RegisterExtension
static LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(new StartupSecondaryAccessReconciler())
.withBeforeStartHook(
ex -> {
var primary = new StartupSecondaryAccessCustomResource();
primary.setMetadata(new ObjectMetaBuilder().withName("test1").build());
primary = ex.serverSideApply(primary);

for (int i = 0; i < SECONDARY_NUMBER; i++) {
ConfigMap cm = new ConfigMap();
cm.setMetadata(
new ObjectMetaBuilder()
.withLabels(Map.of(LABEL_KEY, LABEL_VALUE))
.withNamespace(ex.getNamespace())
.withName("cm" + i)
.build());
cm.addOwnerReference(primary);
ex.serverSideApply(cm);
}
})
.build();

@Test
void reconcilerSeeAllSecondaryResources() {
var reconciler = extension.getReconcilerOfType(StartupSecondaryAccessReconciler.class);

await().untilAsserted(() -> assertThat(reconciler.isReconciled()).isTrue());

assertThat(reconciler.isSecondaryAndCacheSameAmount()).isTrue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;

import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessIT.SECONDARY_NUMBER;

@ControllerConfiguration
public class StartupSecondaryAccessReconciler
implements Reconciler<StartupSecondaryAccessCustomResource> {

private static final Logger log = LoggerFactory.getLogger(StartupSecondaryAccessReconciler.class);

public static final String LABEL_KEY = "app";
public static final String LABEL_VALUE = "secondary-test";

private InformerEventSource<ConfigMap, StartupSecondaryAccessCustomResource> cmInformer;

private boolean secondaryAndCacheSameAmount = true;
private boolean reconciled = false;

@Override
public UpdateControl<StartupSecondaryAccessCustomResource> reconcile(
StartupSecondaryAccessCustomResource resource,
Context<StartupSecondaryAccessCustomResource> context) {

var secondary = context.getSecondaryResources(ConfigMap.class);
var cached = cmInformer.list().toList();

log.info(
"Secondary number: {}, cached: {}, expected: {}",
secondary.size(),
cached.size(),
SECONDARY_NUMBER);

if (secondary.size() != cached.size()) {
secondaryAndCacheSameAmount = false;
}
reconciled = true;
return UpdateControl.noUpdate();
}

@Override
public List<EventSource<?, StartupSecondaryAccessCustomResource>> prepareEventSources(
EventSourceContext<StartupSecondaryAccessCustomResource> context) {
cmInformer =
new InformerEventSource<>(
InformerEventSourceConfiguration.from(
ConfigMap.class, StartupSecondaryAccessCustomResource.class)
.withLabelSelector(LABEL_KEY + "=" + LABEL_VALUE)
.build(),
context);
return List.of(cmInformer);
}

public boolean isSecondaryAndCacheSameAmount() {
return secondaryAndCacheSameAmount;
}

public boolean isReconciled() {
return reconciled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ void dependentResourceCanReferenceEachOther() {
assertThat(operator.get(Secret.class, TEST_RESOURCE_NAME + i)).isNotNull();
}
});

for (int i = 0; i < EXECUTION_NUMBER; i++) {
operator.delete(testResource(i));
}
await()
.timeout(Duration.ofSeconds(30))
.untilAsserted(
() -> {
for (int i = 0; i < EXECUTION_NUMBER; i++) {
assertThat(
operator.get(
DependentResourceCrossRefResource.class,
testResource(i).getMetadata().getName()))
.isNull();
}
});
}

DependentResourceCrossRefResource testResource(int n) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Secret;
Expand All @@ -26,6 +29,8 @@
@ControllerConfiguration
public class DependentResourceCrossRefReconciler
implements Reconciler<DependentResourceCrossRefResource> {
private static final Logger log =
LoggerFactory.getLogger(DependentResourceCrossRefReconciler.class);

public static final String SECRET_NAME = "secret";
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
Expand All @@ -48,6 +53,7 @@ public ErrorStatusUpdateControl<DependentResourceCrossRefResource> updateErrorSt
DependentResourceCrossRefResource resource,
Context<DependentResourceCrossRefResource> context,
Exception e) {
log.error("Status update on error", e);
errorHappened = true;
return ErrorStatusUpdateControl.noStatusUpdate();
}
Expand Down