Skip to content

Commit

Permalink
[DROOLS-7481] Add Cep complex test : ansible use cases : 2nd round
Browse files Browse the repository at this point in the history
- ReliabilityCepOnceWithinTest : ok
- ReliabilityCepTimedOutTest : Had an issue with a nested object reference after deserialization
- Introduce EqualityObjectStore
- Introduce UUID in HashMapFactImpl for equals/hashCode
- WIP
  • Loading branch information
tkobayas committed Jun 23, 2023
1 parent ad19dc5 commit 2b490bf
Show file tree
Hide file tree
Showing 8 changed files with 591 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023. Red Hat, Inc. and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.core.common;

import java.util.HashMap;

public class EqualityObjectStore extends MapObjectStore {

public EqualityObjectStore() {
super(new HashMap<>());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.drools.base.facttemplates.Fact;
import org.drools.base.facttemplates.FactTemplate;
import org.drools.model.PrototypeFact;

public class HashMapFactImpl implements Fact, PrototypeFact, Serializable {

protected final UUID uuid;

protected final FactTemplate factTemplate;

protected final Map<String, Object> valuesMap;
Expand All @@ -35,6 +38,7 @@ public HashMapFactImpl( FactTemplate factTemplate ) {
}

public HashMapFactImpl( FactTemplate factTemplate, Map<String, Object> valuesMap ) {
this.uuid = UUID.randomUUID();
this.factTemplate = factTemplate;
this.valuesMap = valuesMap;
}
Expand Down Expand Up @@ -68,4 +72,30 @@ public Map<String, Object> asMap() {
public String toString() {
return "Fact " + factTemplate.getName() + " with values = " + valuesMap;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
HashMapFactImpl other = (HashMapFactImpl) obj;
if (uuid == null) {
if (other.uuid != null)
return false;
} else if (!uuid.equals(other.uuid))
return false;
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import org.drools.core.common.Storage;
import org.drools.core.phreak.PhreakTimerNode.TimerNodeJob;
import org.drools.core.reteoo.ObjectTypeNode.ExpireJob;
import org.drools.core.time.impl.PseudoClockScheduler;
import org.drools.core.time.impl.TimerJobInstance;
Expand Down Expand Up @@ -65,11 +66,11 @@ private void updateStorage() {
}

/**
* ExpireJob is recreated by repropagate, so doesn't need to persist
* ExpireJob and TimerNodeJob are recreated by repropagate, so we don't need to persist
*/
public List<TimerJobInstance> createFilteredInternalQueueForPersistence(PriorityQueue<TimerJobInstance> queue) {
return queue.stream()
.filter(job -> !(job.getJob() instanceof ExpireJob))
.filter(job -> !(job.getJob() instanceof ExpireJob || job.getJob() instanceof TimerNodeJob))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
import org.drools.core.common.EqualityObjectStore;
import org.drools.core.common.InternalFactHandle;
import org.drools.core.common.InternalWorkingMemory;
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {
public class SimpleSerializationReliableObjectStore extends EqualityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@
@ExtendWith(BeforeAllMethodExtension.class)
class ReliabilityCepOnceAfterTest extends ReliabilityTestBasics {

public static final String RULE_TYPE_TAG = "RULE_TYPE";
public static final String SYNTHETIC_RULE_TAG = "SYNTHETIC_RULE";
public static final String KEYWORD = "once_after";
public static final String RULE_NAME = "R";

/**
* These rules are created in the same way as in OnceAfterDefinition in drools-ansible-rulebook-integration
*
* @return
* These rules are created in the same way as OnceAfterDefinition in drools-ansible-rulebook-integration
*/
private Model ruleModel() {
Prototype controlPrototype = getPrototype(SYNTHETIC_PROTOTYPE_NAME);
Expand All @@ -81,7 +77,8 @@ private Model ruleModel() {
List<Rule> rules = new ArrayList<>();

// main rule (accumulate events within 10 minutes)
rules.add(rule(RULE_NAME).metadata(RULE_TYPE_TAG, KEYWORD)
rules.add(
rule(RULE_NAME).metadata(RULE_TYPE_TAG, KEYWORD)
.build(
protoPattern(controlVar1).expr("end_once_after", Index.ConstraintType.EQUAL, RULE_NAME),
not(protoPattern(controlVar2).expr("start_once_after", Index.ConstraintType.EQUAL, RULE_NAME)),
Expand All @@ -90,21 +87,22 @@ private Model ruleModel() {
on(controlVar1, resultsVar, global).execute((drools, controlFact, resultFactList, globalResults) -> {
drools.delete(controlFact);
processResults(globalResults, resultFactList);
((List) resultFactList).forEach(drools::delete);
resultFactList.forEach(drools::delete);
})
)
);

// control rule (wrapping original event)
PrototypeVariable originalEventVariable = variable(getPrototype(DEFAULT_PROTOTYPE_NAME), "m");
rules.add(rule(RULE_NAME + "_control").metadata(SYNTHETIC_RULE_TAG, true)
rules.add(
rule(RULE_NAME + "_control").metadata(SYNTHETIC_RULE_TAG, true)
.build(
guardedPattern(originalEventVariable),
not(duplicateControlPattern(originalEventVariable)),
on(originalEventVariable).execute((drools, event) -> {
Event controlEvent = createMapBasedEvent(controlPrototype);
controlEvent.set("sensu.host", event.get("sensu.host"));
controlEvent.set("sensu.process.type", event.get("sensu.process.type"));
controlEvent.set("sensu.host", event.get("sensu.host")); // groupByAttributes
controlEvent.set("sensu.process.type", event.get("sensu.process.type")); // groupByAttributes
controlEvent.set("drools_rule_name", RULE_NAME);
controlEvent.set("event", event);
controlEvent.set("once_after_time_window", "10 minutes");
Expand All @@ -117,7 +115,8 @@ private Model ruleModel() {

// start rule (insert start and end control events. start event expires in 10 minutes, so the main rule will fire)
TimeAmount timeAmount = TimeAmount.parseTimeAmount("10 minutes");
rules.add(rule(RULE_NAME + "_start").metadata(SYNTHETIC_RULE_TAG, true)
rules.add(
rule(RULE_NAME + "_start").metadata(SYNTHETIC_RULE_TAG, true)
.build(
protoPattern(controlVar1).expr("drools_rule_name", Index.ConstraintType.EQUAL, RULE_NAME),
not(protoPattern(controlVar2).expr("end_once_after", Index.ConstraintType.EQUAL, RULE_NAME)),
Expand All @@ -136,7 +135,8 @@ private Model ruleModel() {

// cleanup duplicate events rule
PrototypeDSL.PrototypePatternDef duplicateControlPattern = duplicateControlPattern(originalEventVariable);
rules.add(rule(RULE_NAME + "_cleanup_duplicate").metadata(SYNTHETIC_RULE_TAG, true)
rules.add(
rule(RULE_NAME + "_cleanup_duplicate").metadata(SYNTHETIC_RULE_TAG, true)
.build(
guardedPattern(originalEventVariable),
duplicateControlPattern,
Expand All @@ -158,8 +158,8 @@ private static PrototypeDSL.PrototypePatternDef guardedPattern(PrototypeVariable
// We group-by sensu.host and sensu.process.type
private static PrototypeDSL.PrototypePatternDef duplicateControlPattern(PrototypeVariable originalEventVariable) {
return protoPattern(variable(getPrototype(SYNTHETIC_PROTOTYPE_NAME)))
.expr(prototypeField("sensu.host"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.host"))
.expr(prototypeField("sensu.process.type"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.process.type"))
.expr(prototypeField("sensu.host"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.host")) // groupByAttributes
.expr(prototypeField("sensu.process.type"), Index.ConstraintType.EQUAL, originalEventVariable, prototypeField("sensu.process.type")) // groupByAttributes
.expr(prototypeField("drools_rule_name"), Index.ConstraintType.EQUAL, fixedValue(RULE_NAME));
}

Expand Down Expand Up @@ -222,13 +222,13 @@ void insertAdvanceFailoverFire_shouldRecoverFromFailover(PersistedSessionOption.
assertThat(getResults()).as("once_after is 10 minutes window. The main rule should not be fired yet")
.isEmpty();

advanceTime(7, TimeUnit.MINUTES);
advanceTime(7, TimeUnit.MINUTES); // controlEvent expire job should be triggered, but the action is still in propagationList. Will be lost by server crash

// advanceTime, then server crash before fireAllRules
// Generally this doesn't happen in drools-ansible because AutomaticPseudoClock does advanceTime + fireAllRules atomically, but simulating a crash in the middle.

failover();
restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);
restoreSession(ruleModel(), persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); // expire job is recreated and triggered

fireAllRules();

Expand Down
Loading

0 comments on commit 2b490bf

Please sign in to comment.