Skip to content

Commit 8558c1a

Browse files
committed
remove the use of the previous annotation via locking
Signed-off-by: Steve Hawkins <[email protected]>
1 parent c310e6b commit 8558c1a

File tree

10 files changed

+142
-181
lines changed

10 files changed

+142
-181
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -447,19 +447,6 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
447447
return defaultNonSSAResources();
448448
}
449449

450-
/**
451-
* If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect
452-
* events from its own updates of dependent resources and then filter them.
453-
*
454-
* <p>Disable this if you want to react to your own dependent resource updates
455-
*
456-
* @return if special annotation should be used for dependent resource to filter events
457-
* @since 4.5.0
458-
*/
459-
default boolean previousAnnotationForDependentResourcesEventFiltering() {
460-
return true;
461-
}
462-
463450
/**
464451
* For dependent resources, the framework can add an annotation to filter out events resulting
465452
* directly from the framework's operation. There are, however, some resources that do not follow

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -331,13 +331,6 @@ public Set<Class<? extends HasMetadata>> defaultNonSSAResources() {
331331
defaultNonSSAResource, ConfigurationService::defaultNonSSAResources);
332332
}
333333

334-
@Override
335-
public boolean previousAnnotationForDependentResourcesEventFiltering() {
336-
return overriddenValueOrDefault(
337-
previousAnnotationForDependentResources,
338-
ConfigurationService::previousAnnotationForDependentResourcesEventFiltering);
339-
}
340-
341334
@Override
342335
public boolean parseResourceVersionsForEventFilteringAndCaching() {
343336
return overriddenValueOrDefault(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public abstract class KubernetesDependentResource<R extends HasMetadata, P exten
5555
private final boolean garbageCollected = this instanceof GarbageCollected;
5656
private KubernetesDependentResourceConfig<R> kubernetesDependentResourceConfig;
5757
private volatile Boolean useSSA;
58-
private volatile Boolean usePreviousAnnotationForEventFiltering;
5958

6059
public KubernetesDependentResource() {}
6160

@@ -72,6 +71,27 @@ public void configureWith(KubernetesDependentResourceConfig<R> config) {
7271
this.kubernetesDependentResourceConfig = config;
7372
}
7473

74+
@Override
75+
protected R handleCreate(R desired, P primary, Context<P> context) {
76+
return eventSource()
77+
.orElseThrow()
78+
.updateAndCacheResource(
79+
desired,
80+
context,
81+
toCreate -> KubernetesDependentResource.super.handleCreate(toCreate, primary, context));
82+
}
83+
84+
@Override
85+
protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
86+
return eventSource()
87+
.orElseThrow()
88+
.updateAndCacheResource(
89+
desired,
90+
context,
91+
toUpdate ->
92+
KubernetesDependentResource.super.handleUpdate(actual, toUpdate, primary, context));
93+
}
94+
7595
@SuppressWarnings("unused")
7696
public R create(R desired, P primary, Context<P> context) {
7797
if (useSSA(context)) {
@@ -158,14 +178,6 @@ protected void addMetadata(
158178
} else {
159179
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
160180
}
161-
} else if (usePreviousAnnotation(context)) { // set a new one
162-
eventSource()
163-
.orElseThrow()
164-
.addPreviousAnnotation(
165-
Optional.ofNullable(actualResource)
166-
.map(r -> r.getMetadata().getResourceVersion())
167-
.orElse(null),
168-
target);
169181
}
170182
addReferenceHandlingMetadata(target, primary);
171183
}
@@ -181,22 +193,6 @@ protected boolean useSSA(Context<P> context) {
181193
return useSSA;
182194
}
183195

184-
private boolean usePreviousAnnotation(Context<P> context) {
185-
if (usePreviousAnnotationForEventFiltering == null) {
186-
usePreviousAnnotationForEventFiltering =
187-
context
188-
.getControllerConfiguration()
189-
.getConfigurationService()
190-
.previousAnnotationForDependentResourcesEventFiltering()
191-
&& !context
192-
.getControllerConfiguration()
193-
.getConfigurationService()
194-
.withPreviousAnnotationForDependentResourcesBlocklist()
195-
.contains(this.resourceType());
196-
}
197-
return usePreviousAnnotationForEventFiltering;
198-
}
199-
200196
@Override
201197
protected void handleDelete(P primary, R secondary, Context<P> context) {
202198
if (secondary != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 33 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import java.util.Optional;
1919
import java.util.Set;
20-
import java.util.UUID;
20+
import java.util.function.UnaryOperator;
2121
import java.util.stream.Collectors;
2222

2323
import org.slf4j.Logger;
@@ -78,7 +78,6 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7878
// we need direct control for the indexer to propagate the just update resource also to the index
7979
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
8080
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
81-
private final String id = UUID.randomUUID().toString();
8281

8382
public InformerEventSource(
8483
InformerEventSourceConfiguration<R> configuration, EventSourceContext<P> context) {
@@ -122,6 +121,22 @@ private InformerEventSource(
122121
genericFilter = informerConfig.getGenericFilter();
123122
}
124123

124+
public R updateAndCacheResource(
125+
R resourceToUpdate, Context<?> context, UnaryOperator<R> updateMethod) {
126+
ResourceID id = ResourceID.fromResource(resourceToUpdate);
127+
if (log.isDebugEnabled()) {
128+
log.debug("Update and cache: {}", id);
129+
}
130+
try {
131+
temporaryResourceCache.startModifying(id);
132+
var updated = updateMethod.apply(resourceToUpdate);
133+
handleRecentResourceUpdate(id, updated, resourceToUpdate);
134+
return updated;
135+
} finally {
136+
temporaryResourceCache.doneModifying(id);
137+
}
138+
}
139+
125140
@Override
126141
public void onAdd(R newResource) {
127142
if (log.isDebugEnabled()) {
@@ -131,9 +146,7 @@ public void onAdd(R newResource) {
131146
resourceType().getSimpleName(),
132147
newResource.getMetadata().getResourceVersion());
133148
}
134-
primaryToSecondaryIndex.onAddOrUpdate(newResource);
135-
onAddOrUpdate(
136-
Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource));
149+
onAddOrUpdate(Operation.ADD, newResource, null);
137150
}
138151

139152
@Override
@@ -146,16 +159,11 @@ public void onUpdate(R oldObject, R newObject) {
146159
newObject.getMetadata().getResourceVersion(),
147160
oldObject.getMetadata().getResourceVersion());
148161
}
149-
primaryToSecondaryIndex.onAddOrUpdate(newObject);
150-
onAddOrUpdate(
151-
Operation.UPDATE,
152-
newObject,
153-
oldObject,
154-
() -> InformerEventSource.super.onUpdate(oldObject, newObject));
162+
onAddOrUpdate(Operation.UPDATE, newObject, oldObject);
155163
}
156164

157165
@Override
158-
public void onDelete(R resource, boolean b) {
166+
public synchronized void onDelete(R resource, boolean b) {
159167
if (log.isDebugEnabled()) {
160168
log.debug(
161169
"On delete event received for resource id: {} type: {}",
@@ -177,53 +185,26 @@ public synchronized void start() {
177185
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
178186
}
179187

180-
private synchronized void onAddOrUpdate(
181-
Operation operation, R newObject, R oldObject, Runnable superOnOp) {
188+
private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) {
189+
primaryToSecondaryIndex.onAddOrUpdate(newObject);
182190
var resourceID = ResourceID.fromResource(newObject);
183191

184-
if (canSkipEvent(newObject, oldObject, resourceID)) {
192+
if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) {
185193
log.debug(
186194
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
187195
+ " ID: {}",
188196
operation,
189197
ResourceID.fromResource(newObject));
190-
superOnOp.run();
198+
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
199+
log.debug(
200+
"Propagating event for {}, resource with same version not result of a reconciliation."
201+
+ " Resource ID: {}",
202+
operation,
203+
resourceID);
204+
propagateEvent(newObject);
191205
} else {
192-
superOnOp.run();
193-
if (eventAcceptedByFilter(operation, newObject, oldObject)) {
194-
log.debug(
195-
"Propagating event for {}, resource with same version not result of a reconciliation."
196-
+ " Resource ID: {}",
197-
operation,
198-
resourceID);
199-
propagateEvent(newObject);
200-
} else {
201-
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
202-
}
203-
}
204-
}
205-
206-
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
207-
return temporaryResourceCache.canSkipEvent(resourceID, newObject)
208-
|| isEventKnownFromAnnotation(newObject, oldObject);
209-
}
210-
211-
private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
212-
String previous = newObject.getMetadata().getAnnotations().get(PREVIOUS_ANNOTATION_KEY);
213-
boolean known = false;
214-
if (previous != null) {
215-
String[] parts = previous.split(",");
216-
if (id.equals(parts[0])) {
217-
if (oldObject == null && parts.length == 1) {
218-
known = true;
219-
} else if (oldObject != null
220-
&& parts.length == 2
221-
&& oldObject.getMetadata().getResourceVersion().equals(parts[1])) {
222-
known = true;
223-
}
224-
}
206+
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
225207
}
226-
return known;
227208
}
228209

229210
private void propagateEvent(R object) {
@@ -273,13 +254,13 @@ public Set<R> getSecondaryResources(P primary) {
273254
}
274255

275256
@Override
276-
public synchronized void handleRecentResourceUpdate(
257+
public void handleRecentResourceUpdate(
277258
ResourceID resourceID, R resource, R previousVersionOfResource) {
278259
handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource);
279260
}
280261

281262
@Override
282-
public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) {
263+
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
283264
handleRecentCreateOrUpdate(Operation.ADD, resource, null);
284265
}
285266

@@ -313,22 +294,6 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
313294
&& (genericFilter == null || genericFilter.accept(resource));
314295
}
315296

316-
/**
317-
* Add an annotation to the resource so that the subsequent will be omitted
318-
*
319-
* @param resourceVersion null if there is no prior version
320-
* @param target mutable resource that will be returned
321-
*/
322-
public R addPreviousAnnotation(String resourceVersion, R target) {
323-
target
324-
.getMetadata()
325-
.getAnnotations()
326-
.put(
327-
PREVIOUS_ANNOTATION_KEY,
328-
id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse(""));
329-
return target;
330-
}
331-
332297
private enum Operation {
333298
ADD,
334299
UPDATE

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,6 @@ public Optional<R> get(ResourceID resourceID) {
221221
: r);
222222
}
223223

224-
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
225-
return getSource(namespace.orElse(WATCH_ALL_NAMESPACES))
226-
.map(source -> source.getLastSyncResourceVersion());
227-
}
228-
229224
@Override
230225
public Stream<ResourceID> keys() {
231226
return sources.values().stream().flatMap(Cache::keys);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public synchronized void start() {
103103
if (isRunning()) {
104104
return;
105105
}
106-
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
106+
temporaryResourceCache = new TemporaryResourceCache<>(parseResourceVersions);
107107
this.cache = new InformerManager<>(client, configuration, this);
108108
cache.setControllerConfiguration(controllerConfiguration);
109109
cache.addIndexers(indexers);
@@ -153,10 +153,6 @@ public Optional<R> get(ResourceID resourceID) {
153153
return res;
154154
}
155155

156-
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
157-
return cache.getLastSyncResourceVersion(namespace);
158-
}
159-
160156
@SuppressWarnings("unused")
161157
public Optional<R> getCachedValue(ResourceID resourceID) {
162158
return get(resourceID);

0 commit comments

Comments
 (0)