From 2b490bf3811edc19868f89b85e8b3a8863640aba Mon Sep 17 00:00:00 2001 From: Toshiya Kobayashi Date: Fri, 23 Jun 2023 12:02:42 +0900 Subject: [PATCH] [DROOLS-7481] Add Cep complex test : ansible use cases : 2nd round - ReliabilityCepOnceWithinTest : ok - ReliabilityCepTimedOutTest : Had an issue with a nested object reference after deserialization - Introduce EqualityObjectStore - Introduce UUID in HashMapFactImpl for equals/hashCode - WIP --- .../core/common/EqualityObjectStore.java | 25 ++ .../facttemplate/HashMapFactImpl.java | 30 ++ .../core/ReliablePseudoClockScheduler.java | 5 +- ...impleSerializationReliableObjectStore.java | 4 +- .../ReliabilityCepOnceAfterTest.java | 32 +- .../ReliabilityCepOnceWithinTest.java | 203 ++++++++++++ .../ReliabilityCepTimedOutTest.java | 291 ++++++++++++++++++ .../infinispan/ReliabilityTestBasics.java | 29 +- 8 files changed, 591 insertions(+), 28 deletions(-) create mode 100644 drools-core/src/main/java/org/drools/core/common/EqualityObjectStore.java create mode 100644 drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceWithinTest.java create mode 100644 drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTimedOutTest.java diff --git a/drools-core/src/main/java/org/drools/core/common/EqualityObjectStore.java b/drools-core/src/main/java/org/drools/core/common/EqualityObjectStore.java new file mode 100644 index 00000000000..e6c74d89dbc --- /dev/null +++ b/drools-core/src/main/java/org/drools/core/common/EqualityObjectStore.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023. Red Hat, Inc. and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.core.common; + +import java.util.HashMap; + +public class EqualityObjectStore extends MapObjectStore { + + public EqualityObjectStore() { + super(new HashMap<>()); + } + +} diff --git a/drools-model/drools-model-compiler/src/main/java/org/drools/modelcompiler/facttemplate/HashMapFactImpl.java b/drools-model/drools-model-compiler/src/main/java/org/drools/modelcompiler/facttemplate/HashMapFactImpl.java index a61f0fae49e..c4b5e5cca4a 100644 --- a/drools-model/drools-model-compiler/src/main/java/org/drools/modelcompiler/facttemplate/HashMapFactImpl.java +++ b/drools-model/drools-model-compiler/src/main/java/org/drools/modelcompiler/facttemplate/HashMapFactImpl.java @@ -19,6 +19,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.drools.base.facttemplates.Fact; import org.drools.base.facttemplates.FactTemplate; @@ -26,6 +27,8 @@ public class HashMapFactImpl implements Fact, PrototypeFact, Serializable { + protected final UUID uuid; + protected final FactTemplate factTemplate; protected final Map valuesMap; @@ -35,6 +38,7 @@ public HashMapFactImpl( FactTemplate factTemplate ) { } public HashMapFactImpl( FactTemplate factTemplate, Map valuesMap ) { + this.uuid = UUID.randomUUID(); this.factTemplate = factTemplate; this.valuesMap = valuesMap; } @@ -68,4 +72,30 @@ public Map asMap() { public String toString() { return "Fact " + factTemplate.getName() + " with values = " + valuesMap; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((uuid == null) ? 0 : uuid.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HashMapFactImpl other = (HashMapFactImpl) obj; + if (uuid == null) { + if (other.uuid != null) + return false; + } else if (!uuid.equals(other.uuid)) + return false; + return true; + } + } diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliablePseudoClockScheduler.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliablePseudoClockScheduler.java index 6d70432ad80..7ec1445005b 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliablePseudoClockScheduler.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliablePseudoClockScheduler.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.drools.core.common.Storage; +import org.drools.core.phreak.PhreakTimerNode.TimerNodeJob; import org.drools.core.reteoo.ObjectTypeNode.ExpireJob; import org.drools.core.time.impl.PseudoClockScheduler; import org.drools.core.time.impl.TimerJobInstance; @@ -65,11 +66,11 @@ private void updateStorage() { } /** - * ExpireJob is recreated by repropagate, so doesn't need to persist + * ExpireJob and TimerNodeJob are recreated by repropagate, so we don't need to persist */ public List createFilteredInternalQueueForPersistence(PriorityQueue queue) { return queue.stream() - .filter(job -> !(job.getJob() instanceof ExpireJob)) + .filter(job -> !(job.getJob() instanceof ExpireJob || job.getJob() instanceof TimerNodeJob)) .collect(Collectors.toList()); } } diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java index f8e9a4921fb..355256a5533 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java @@ -21,13 +21,13 @@ import org.drools.core.ClockType; import org.drools.core.common.DefaultEventHandle; -import org.drools.core.common.IdentityObjectStore; +import org.drools.core.common.EqualityObjectStore; import org.drools.core.common.InternalFactHandle; import org.drools.core.common.InternalWorkingMemory; import org.drools.core.common.InternalWorkingMemoryEntryPoint; import org.drools.core.common.Storage; -public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore { +public class SimpleSerializationReliableObjectStore extends EqualityObjectStore implements SimpleReliableObjectStore { protected final transient Storage storage; diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceAfterTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceAfterTest.java index 25fb272f539..e7b7510a546 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceAfterTest.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceAfterTest.java @@ -60,15 +60,11 @@ @ExtendWith(BeforeAllMethodExtension.class) class ReliabilityCepOnceAfterTest extends ReliabilityTestBasics { - public static final String RULE_TYPE_TAG = "RULE_TYPE"; - public static final String SYNTHETIC_RULE_TAG = "SYNTHETIC_RULE"; public static final String KEYWORD = "once_after"; public static final String RULE_NAME = "R"; /** - * These rules are created in the same way as in OnceAfterDefinition in drools-ansible-rulebook-integration - * - * @return + * These rules are created in the same way as OnceAfterDefinition in drools-ansible-rulebook-integration */ private Model ruleModel() { Prototype controlPrototype = getPrototype(SYNTHETIC_PROTOTYPE_NAME); @@ -81,7 +77,8 @@ private Model ruleModel() { List rules = new ArrayList<>(); // main rule (accumulate events within 10 minutes) - rules.add(rule(RULE_NAME).metadata(RULE_TYPE_TAG, KEYWORD) + rules.add( + rule(RULE_NAME).metadata(RULE_TYPE_TAG, KEYWORD) .build( protoPattern(controlVar1).expr("end_once_after", Index.ConstraintType.EQUAL, RULE_NAME), not(protoPattern(controlVar2).expr("start_once_after", Index.ConstraintType.EQUAL, RULE_NAME)), @@ -90,21 +87,22 @@ private Model ruleModel() { on(controlVar1, resultsVar, global).execute((drools, controlFact, resultFactList, globalResults) -> { drools.delete(controlFact); processResults(globalResults, resultFactList); - ((List) resultFactList).forEach(drools::delete); + resultFactList.forEach(drools::delete); }) ) ); // control rule (wrapping original event) PrototypeVariable originalEventVariable = variable(getPrototype(DEFAULT_PROTOTYPE_NAME), "m"); - rules.add(rule(RULE_NAME + "_control").metadata(SYNTHETIC_RULE_TAG, true) + rules.add( + rule(RULE_NAME + "_control").metadata(SYNTHETIC_RULE_TAG, true) .build( guardedPattern(originalEventVariable), not(duplicateControlPattern(originalEventVariable)), on(originalEventVariable).execute((drools, event) -> { Event controlEvent = createMapBasedEvent(controlPrototype); - controlEvent.set("sensu.host", event.get("sensu.host")); - controlEvent.set("sensu.process.type", event.get("sensu.process.type")); + controlEvent.set("sensu.host", event.get("sensu.host")); // groupByAttributes + controlEvent.set("sensu.process.type", event.get("sensu.process.type")); // groupByAttributes controlEvent.set("drools_rule_name", RULE_NAME); controlEvent.set("event", event); controlEvent.set("once_after_time_window", "10 minutes"); @@ -117,7 +115,8 @@ private Model ruleModel() { // start rule (insert start and end control events. start event expires in 10 minutes, so the main rule will fire) TimeAmount timeAmount = TimeAmount.parseTimeAmount("10 minutes"); - rules.add(rule(RULE_NAME + "_start").metadata(SYNTHETIC_RULE_TAG, true) + rules.add( + rule(RULE_NAME + "_start").metadata(SYNTHETIC_RULE_TAG, true) .build( protoPattern(controlVar1).expr("drools_rule_name", Index.ConstraintType.EQUAL, RULE_NAME), not(protoPattern(controlVar2).expr("end_once_after", Index.ConstraintType.EQUAL, RULE_NAME)), @@ -136,7 +135,8 @@ private Model ruleModel() { // cleanup duplicate events rule PrototypeDSL.PrototypePatternDef duplicateControlPattern = duplicateControlPattern(originalEventVariable); - rules.add(rule(RULE_NAME + "_cleanup_duplicate").metadata(SYNTHETIC_RULE_TAG, true) + rules.add( + rule(RULE_NAME + "_cleanup_duplicate").metadata(SYNTHETIC_RULE_TAG, true) .build( guardedPattern(originalEventVariable), duplicateControlPattern, @@ -158,8 +158,8 @@ private static PrototypeDSL.PrototypePatternDef guardedPattern(PrototypeVariable // We group-by sensu.host and sensu.process.type private static PrototypeDSL.PrototypePatternDef duplicateControlPattern(PrototypeVariable originalEventVariable) { return protoPattern(variable(getPrototype(SYNTHETIC_PROTOTYPE_NAME))) - .expr(prototypeField("sensu.host"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.host")) - .expr(prototypeField("sensu.process.type"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.process.type")) + .expr(prototypeField("sensu.host"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.host")) // groupByAttributes + .expr(prototypeField("sensu.process.type"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.process.type")) // groupByAttributes .expr(prototypeField("drools_rule_name"), Index.ConstraintType.EQUAL, fixedValue(RULE_NAME)); } @@ -222,13 +222,13 @@ void insertAdvanceFailoverFire_shouldRecoverFromFailover(PersistedSessionOption. assertThat(getResults()).as("once_after is 10 minutes window. The main rule should not be fired yet") .isEmpty(); - advanceTime(7, TimeUnit.MINUTES); + advanceTime(7, TimeUnit.MINUTES); // controlEvent expire job should be triggered, but the action is still in propagationList. Will be lost by server crash // advanceTime, then server crash before fireAllRules // Generally this doesn't happen in drools-ansible because AutomaticPseudoClock does advanceTime + fireAllRules atomically, but simulating a crash in the middle. failover(); - restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); // expire job is recreated and triggered fireAllRules(); diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceWithinTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceWithinTest.java new file mode 100644 index 00000000000..1da4ddf1b5c --- /dev/null +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepOnceWithinTest.java @@ -0,0 +1,203 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.reliability.infinispan; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.drools.base.facttemplates.Event; +import org.drools.model.DroolsEntryPoint; +import org.drools.model.Global; +import org.drools.model.Index; +import org.drools.model.Model; +import org.drools.model.Prototype; +import org.drools.model.PrototypeDSL; +import org.drools.model.PrototypeVariable; +import org.drools.model.Rule; +import org.drools.model.impl.ModelImpl; +import org.drools.modelcompiler.facttemplate.HashMapEventImpl; +import org.drools.reliability.infinispan.util.TimeAmount; +import org.junit.jupiter.api.condition.DisabledIf; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.kie.api.conf.EventProcessingOption; +import org.kie.api.runtime.conf.ClockTypeOption; +import org.kie.api.runtime.conf.PersistedSessionOption; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.groups.Tuple.tuple; +import static org.drools.model.DSL.globalOf; +import static org.drools.model.DSL.not; +import static org.drools.model.DSL.on; +import static org.drools.model.PatternDSL.rule; +import static org.drools.model.PrototypeDSL.protoPattern; +import static org.drools.model.PrototypeDSL.variable; +import static org.drools.model.PrototypeExpression.fixedValue; +import static org.drools.model.PrototypeExpression.prototypeField; +import static org.drools.modelcompiler.facttemplate.FactFactory.createMapBasedEvent; +import static org.drools.reliability.infinispan.util.PrototypeUtils.DEFAULT_PROTOTYPE_NAME; +import static org.drools.reliability.infinispan.util.PrototypeUtils.SYNTHETIC_PROTOTYPE_NAME; +import static org.drools.reliability.infinispan.util.PrototypeUtils.getPrototype; + +@DisabledIf("isProtoStream") +@ExtendWith(BeforeAllMethodExtension.class) +class ReliabilityCepOnceWithinTest extends ReliabilityTestBasics { + + public static final String KEYWORD = "once_within"; + public static final String RULE_NAME = "R"; + + /** + * These rules are created in the same way as OnceWithinDefinition in drools-ansible-rulebook-integration + */ + private Model ruleModel() { + Prototype controlPrototype = getPrototype(SYNTHETIC_PROTOTYPE_NAME); + Global global = globalOf(List.class, "defaultpkg", "results"); + + List rules = new ArrayList<>(); + + // main rule (match only once for grouped events within 10 minutes) + TimeAmount timeAmount = TimeAmount.parseTimeAmount("10 minutes"); + PrototypeVariable originalEventVariable = variable(getPrototype(DEFAULT_PROTOTYPE_NAME), "m"); + rules.add(rule(RULE_NAME).metadata(RULE_TYPE_TAG, KEYWORD) + .build( + guardedPattern(originalEventVariable), + not(duplicateControlPattern(originalEventVariable)), + on(originalEventVariable, global).execute((drools, event, globalResults) -> { + Event controlEvent = createMapBasedEvent(controlPrototype) + .withExpiration(timeAmount.getAmount(), timeAmount.getTimeUnit()); + controlEvent.set("sensu.host", event.get("sensu.host")); // groupByAttributes + controlEvent.set("sensu.process.type", event.get("sensu.process.type")); // groupByAttributes + controlEvent.set("drools_rule_name", RULE_NAME); + drools.insert(controlEvent); + globalResults.add(event); + drools.delete(event); + }) + ) + ); + + // cleanup duplicate events rule + rules.add(rule( "cleanup_" + RULE_NAME ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + guardedPattern(originalEventVariable), + duplicateControlPattern(originalEventVariable), + on(originalEventVariable).execute(DroolsEntryPoint::delete) + ) + ); + + return new ModelImpl().withRules(rules).addGlobal(global); + } + + // This is the pattern which we want to match + private static PrototypeDSL.PrototypePatternDef guardedPattern(PrototypeVariable originalEventVariable) { + return protoPattern(originalEventVariable).expr(prototypeField("sensu.process.type"), Index.ConstraintType.EQUAL, fixedValue("alert")); + } + + // We group-by sensu.host and sensu.process.type + private static PrototypeDSL.PrototypePatternDef duplicateControlPattern(PrototypeVariable originalEventVariable) { + return protoPattern(variable(getPrototype(SYNTHETIC_PROTOTYPE_NAME))) + .expr(prototypeField("sensu.host"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.host")) // groupByAttributes + .expr(prototypeField("sensu.process.type"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.process.type")) // groupByAttributes + .expr(prototypeField("drools_rule_name"), Index.ConstraintType.EQUAL, fixedValue(RULE_NAME)); + } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") + void insertFailoverExpireFire_shouldCollectEventAfterWindow(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { + + createSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + insertMatchingSensuEvent("host1", "alert"); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertNonMatchingSensuEvent("host1", "info"); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertMatchingSensuEvent("host2", "alert"); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertMatchingSensuEvent("host1", "alert"); // duplicate event + advanceTimeAndFire(1, TimeUnit.MINUTES); + + assertThat(getResults()).as("once_within is 10 minutes window. 2 events should be collected") + .hasSize(2); + + failover(); + restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + insertMatchingSensuEvent("host1", "alert"); // duplicate event + advanceTimeAndFire(1, TimeUnit.MINUTES); + + assertThat(getResults()).as("once_within is 10 minutes window. 2 events should be collected") + .hasSize(2); + + advanceTimeAndFire(10, TimeUnit.MINUTES); // controlEvent should be expired + + insertMatchingSensuEvent("host1", "alert"); // duplicate event, but 10 minutes window is over, so it should match + fireAllRules(); + + assertThat(getResults()).as("after 10 minutes window. 3 events should be collected") + .hasSize(3) + .allMatch(event -> event instanceof HashMapEventImpl) + .extracting(event -> ((HashMapEventImpl) event).get("sensu.host"), event -> ((HashMapEventImpl) event).get("sensu.process.type")) + .containsExactly(tuple("host1", "alert"), + tuple("host2", "alert"), + tuple("host1", "alert")); + } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") + void insertExpireFailoverFire_shouldCollectEventAfterWindow(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { + + createSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + insertMatchingSensuEvent("host1", "alert"); + + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertNonMatchingSensuEvent("host1", "info"); + + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertMatchingSensuEvent("host2", "alert"); + + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertMatchingSensuEvent("host1", "alert"); // duplicate event + + advanceTimeAndFire(1, TimeUnit.MINUTES); + + assertThat(getResults()).as("once_within is 10 minutes window. 2 events should be collected") + .hasSize(2); + + advanceTime(10, TimeUnit.MINUTES); // controlEvent expire job should be triggered, but the action is still in propagationList. Will be lost by server crash + + failover(); + restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); // expire job is recreated and triggered + + insertMatchingSensuEvent("host1", "alert"); // duplicate event, but 10 minutes window is over, so it should match + fireAllRules(); + + assertThat(getResults()).as("after 10 minutes window. 3 events should be collected") + .hasSize(3) + .allMatch(event -> event instanceof HashMapEventImpl) + .extracting(event -> ((HashMapEventImpl) event).get("sensu.host"), event -> ((HashMapEventImpl) event).get("sensu.process.type")) + .containsExactly(tuple("host1", "alert"), + tuple("host2", "alert"), + tuple("host1", "alert")); + } +} diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTimedOutTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTimedOutTest.java new file mode 100644 index 00000000000..7a7173e741d --- /dev/null +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTimedOutTest.java @@ -0,0 +1,291 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.reliability.infinispan; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.drools.base.facttemplates.Event; +import org.drools.model.Global; +import org.drools.model.Index; +import org.drools.model.Model; +import org.drools.model.Prototype; +import org.drools.model.PrototypeVariable; +import org.drools.model.Rule; +import org.drools.model.Variable; +import org.drools.model.impl.ModelImpl; +import org.drools.reliability.infinispan.util.TimeAmount; +import org.junit.jupiter.api.condition.DisabledIf; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.kie.api.conf.EventProcessingOption; +import org.kie.api.runtime.conf.ClockTypeOption; +import org.kie.api.runtime.conf.PersistedSessionOption; +import org.kie.api.runtime.rule.RuleContext; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.drools.model.DSL.accFunction; +import static org.drools.model.DSL.accumulate; +import static org.drools.model.DSL.after; +import static org.drools.model.DSL.declarationOf; +import static org.drools.model.DSL.globalOf; +import static org.drools.model.DSL.not; +import static org.drools.model.DSL.on; +import static org.drools.model.PatternDSL.pattern; +import static org.drools.model.PatternDSL.rule; +import static org.drools.model.PrototypeDSL.protoPattern; +import static org.drools.model.PrototypeDSL.variable; +import static org.drools.modelcompiler.facttemplate.FactFactory.createMapBasedEvent; +import static org.drools.reliability.infinispan.util.PrototypeUtils.DEFAULT_PROTOTYPE_NAME; +import static org.drools.reliability.infinispan.util.PrototypeUtils.SYNTHETIC_PROTOTYPE_NAME; +import static org.drools.reliability.infinispan.util.PrototypeUtils.getPrototype; + +@DisabledIf("isProtoStream") +@ExtendWith(BeforeAllMethodExtension.class) +class ReliabilityCepTimedOutTest extends ReliabilityTestBasics { + + public static final String KEYWORD = "timed_out"; + public static final String RULE_NAME = "R"; + + /** + * These rules are created in the same way as TimedOutDefinition in drools-ansible-rulebook-integration + */ + private Model ruleModel() { + Prototype controlPrototype = getPrototype(SYNTHETIC_PROTOTYPE_NAME); + PrototypeVariable controlVar1 = variable(controlPrototype, "c1"); + PrototypeVariable controlVar2 = variable(controlPrototype, "c2"); + PrototypeVariable controlVar3 = variable(controlPrototype, "c3"); + Variable resultCount = declarationOf( Long.class, "count" ); + Global global = globalOf(List.class, "defaultpkg", "results"); + + List rules = new ArrayList<>(); + + // main rule (match only after 5 minutes, but not all sub-rules are matched) + String startTag = "start_" + RULE_NAME; + String endTag = "end_" + RULE_NAME; + TimeAmount timeAmount = TimeAmount.parseTimeAmount("5 minutes"); + + rules.add( + rule( RULE_NAME ).metadata(RULE_TYPE_TAG, KEYWORD) + .build( + protoPattern(controlVar1).expr( "rulename", Index.ConstraintType.EQUAL, startTag ), + not( protoPattern(controlVar2) + .expr( "rulename", Index.ConstraintType.EQUAL, endTag ) + .expr( after(0, timeAmount.getTimeUnit(), timeAmount.getAmount(), timeAmount.getTimeUnit()), controlVar1 ) ), + on(controlVar1, global).execute((drools, controlFact, globalResults) -> { + globalResults.add(RULE_NAME); + drools.delete(controlFact); + }) + ) + ); + + String rulePrefix = RULE_NAME + "_"; + + // sub-rule 0 + String subRuleName0 = rulePrefix + "0"; + PrototypeVariable patternVariable0 = variable(getPrototype(DEFAULT_PROTOTYPE_NAME), "m_0"); + rules.add( + rule( subRuleName0 ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(patternVariable0) + .expr("ping.timeout", Index.ConstraintType.EQUAL, true), + not( protoPattern(controlVar1) + .expr( "rulename", Index.ConstraintType.EQUAL, subRuleName0 ) ), + on(patternVariable0).execute((drools, t1) -> { + Event controlEvent = createMapBasedEvent( controlPrototype ) + .withExpiration(timeAmount.getAmount(), timeAmount.getTimeUnit()); + controlEvent.set( "rulename", subRuleName0 ); + controlEvent.set( "event", t1 ); + controlEvent.set( "binding", ((RuleContext) drools).getMatch().getDeclarationIds().get(0) ); + drools.insert(controlEvent); + }) + ) + ); + + // sub-rule 1 + String subRuleName1 = rulePrefix + "1"; + PrototypeVariable patternVariable1 = variable(getPrototype(DEFAULT_PROTOTYPE_NAME), "m_1"); + rules.add( + rule( subRuleName1 ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(patternVariable1) + .expr("sensu.process.status", Index.ConstraintType.EQUAL, "stopped"), + not( protoPattern(controlVar1) + .expr( "rulename", Index.ConstraintType.EQUAL, subRuleName1 ) ), + on(patternVariable1).execute((drools, t1) -> { + Event controlEvent = createMapBasedEvent( controlPrototype ) + .withExpiration(timeAmount.getAmount(), timeAmount.getTimeUnit()); + controlEvent.set( "rulename", subRuleName1 ); + controlEvent.set( "event", t1 ); + controlEvent.set( "binding", ((RuleContext) drools).getMatch().getDeclarationIds().get(0) ); + drools.insert(controlEvent); + }) + ) + ); + + // sub-rule 2 + String subRuleName2 = rulePrefix + "2"; + PrototypeVariable patternVariable2 = variable(getPrototype(DEFAULT_PROTOTYPE_NAME), "m_2"); + rules.add( + rule( subRuleName2 ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(patternVariable2) + .expr("sensu.storage.percent", Index.ConstraintType.GREATER_THAN, 95), + not( protoPattern(controlVar1) + .expr( "rulename", Index.ConstraintType.EQUAL, subRuleName2 ) ), + on(patternVariable2).execute((drools, t1) -> { + Event controlEvent = createMapBasedEvent( controlPrototype ) + .withExpiration(timeAmount.getAmount(), timeAmount.getTimeUnit()); + controlEvent.set( "rulename", subRuleName2 ); + controlEvent.set( "event", t1 ); + controlEvent.set( "binding", ((RuleContext) drools).getMatch().getDeclarationIds().get(0) ); + drools.insert(controlEvent); + }) + ) + ); + + // start rule + rules.add( + rule(startTag).metadata(SYNTHETIC_RULE_TAG, true) + .build( + not( protoPattern(controlVar1).expr( "rulename", Index.ConstraintType.EQUAL, startTag ) ), + protoPattern(controlVar2).expr( p -> ((String)p.get("rulename")).startsWith(rulePrefix) ), + on(controlVar2).execute((drools, firstEvent) -> { + Event controlEvent = createMapBasedEvent( controlPrototype ) + .withExpiration(timeAmount.getAmount(), timeAmount.getTimeUnit()); + controlEvent.set( "rulename", startTag ); + controlEvent.set( "event", firstEvent.get("event") ); + controlEvent.set( "binding", firstEvent.get("binding") ); + drools.insert(controlEvent); + }) + ) + ); + + // end rule + rules.add( + rule(endTag).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(controlVar1).expr( "rulename", Index.ConstraintType.EQUAL, startTag ), + accumulate( protoPattern(controlVar2).expr(p -> ((String)p.get("rulename")).startsWith(rulePrefix)), + accFunction(org.drools.core.base.accumulators.CountAccumulateFunction::new).as(resultCount)), + pattern(resultCount).expr(count -> count == 3), + on(resultCount).execute((drools, count) -> { + Event controlEvent = createMapBasedEvent( controlPrototype ) + .withExpiration(timeAmount.getAmount(), timeAmount.getTimeUnit()); + controlEvent.set( "rulename", endTag ); + drools.insert(controlEvent); + }) + ) + ); + + // cleanup rule 1 + rules.add( + rule( rulePrefix + "cleanupEvents" ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(controlVar1).expr( "rulename", Index.ConstraintType.EQUAL, endTag ), + protoPattern(controlVar2).expr(p -> ((String)p.get("rulename")).startsWith(rulePrefix)), + on(controlVar1, controlVar2).execute((drools, c1, c2) -> { + drools.delete(c2.get("event")); + drools.delete(c2); + }) + ) + ); + + // cleanup rule 2 + rules.add( + rule( rulePrefix + "cleanupEvents2" ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(controlVar1).expr( "rulename", Index.ConstraintType.EQUAL, startTag ), + not( protoPattern(controlVar2) + .expr( "rulename", Index.ConstraintType.EQUAL, endTag ) + .expr( after(0, timeAmount.getTimeUnit(), timeAmount.getAmount(), timeAmount.getTimeUnit()), controlVar1 ) ), + protoPattern(controlVar3).expr(p -> ((String)p.get("rulename")).startsWith(rulePrefix)), + on(controlVar1, controlVar3).execute((drools, c1, c3) -> { + drools.delete(c3.get("event")); + drools.delete(c3); + drools.delete(c1); + }) + ) + ); + + // cleanup rule 3 + rules.add( + rule( rulePrefix + "cleanupTerminal" ).metadata(SYNTHETIC_RULE_TAG, true) + .build( + protoPattern(controlVar1).expr( "rulename", Index.ConstraintType.EQUAL, startTag ), + protoPattern(controlVar2).expr( "rulename", Index.ConstraintType.EQUAL, endTag ), + on(controlVar1, controlVar2).execute((drools, c1, c2) -> { + drools.delete(c1); + drools.delete(c2); + }) + ) + ); + + return new ModelImpl().withRules(rules).addGlobal(global); + } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") + void insertFailoverExpireFire_shouldFire(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { + + createSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + insertMatchingSensuProcessStatusEvent("stopped"); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertMatchingPingTimeoutEvent(true); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + assertThat(getResults()).as("timeout is 5 minutes window. Not yet fired") + .isEmpty(); + + failover(); + restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + advanceTimeAndFire(5, TimeUnit.MINUTES); // main rule's temporal constraint is evaluated + + assertThat(getResults()).as("after 5 minutes window. the main rule is fired") + .hasSize(1); + } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") + void insertExpireFailoverFire_shouldFire(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { + + createSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + insertMatchingSensuProcessStatusEvent("stopped"); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + insertMatchingPingTimeoutEvent(true); + advanceTimeAndFire(1, TimeUnit.MINUTES); + + advanceTime(5, TimeUnit.MINUTES); // TimerNodeJob is triggered, but the action is still in propagationList. Will be lost by server crash + + assertThat(getResults()).as("5 minutes timeout is over. But not yet fired") + .isEmpty(); + + failover(); + restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); // TimerNodeJob is recreated and triggered + + fireAllRules(); + + assertThat(getResults()).as("after 5 minutes window. the main rule is fired") + .hasSize(1); + } +} diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java index e7cfcc28afb..8161df6e0a6 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java @@ -16,6 +16,7 @@ package org.drools.reliability.infinispan; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -52,14 +53,6 @@ import org.slf4j.LoggerFactory; import org.test.domain.Person; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.stream.Stream; - - import static org.drools.reliability.infinispan.InfinispanStorageManagerFactory.INFINISPAN_STORAGE_MARSHALLER; import static org.drools.reliability.infinispan.util.PrototypeUtils.createEvent; import static org.drools.util.Config.getConfig; @@ -68,6 +61,8 @@ @ExtendWith(BeforeAllMethodExtension.class) public abstract class ReliabilityTestBasics { + public static final String RULE_TYPE_TAG = "RULE_TYPE"; + public static final String SYNTHETIC_RULE_TAG = "SYNTHETIC_RULE"; private static final Logger LOG = LoggerFactory.getLogger(ReliabilityTestBasics.class); private InfinispanContainer container; @@ -378,4 +373,22 @@ private void insertSensuEvent(String host, String type) { sensu.set("sensu.process.type", type); sessions.get(0).insert(sensu); } + + protected void insertMatchingSensuProcessStatusEvent(String status) { + Event sensu = createEvent(); + sensu.set("sensu.process.status", status); + sessions.get(0).insert(sensu); + } + + protected void insertMatchingPingTimeoutEvent(boolean timeout) { + Event ping = createEvent(); + ping.set("ping.timeout", timeout); + sessions.get(0).insert(ping); + } + + protected void insertMatchingSensuStoragePercentEvent(int percent) { + Event sensu = createEvent(); + sensu.set("sensu.storage.percent", percent); + sessions.get(0).insert(sensu); + } }