Skip to content

Commit

Permalink
updateStorage after clearResults
Browse files Browse the repository at this point in the history
  • Loading branch information
nprentza committed Jun 29, 2023
1 parent 78edab1 commit cce6933
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.kie.api.time.SessionPseudoClock;
Expand All @@ -35,13 +36,13 @@ class ReliabilityCepTest extends ReliabilityTestBasics {

private static final String CEP_RULE =
"import " + StockTick.class.getCanonicalName() + ";" +
"global java.util.List results;" +
"rule R when\n" +
" $a : StockTick( company == \"DROO\" )\n" +
" $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" +
"then\n" +
" results.add(\"fired\");\n" +
"end\n";
"global java.util.List results;" +
"rule R when\n" +
" $a : StockTick( company == \"DROO\" )\n" +
" $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" +
"then\n" +
" results.add(\"fired\");\n" +
"end\n";

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor"
Expand All @@ -51,9 +52,9 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO

SessionPseudoClock clock = getSessionClock();

insert( new StockTick( "DROO" ) );
clock.advanceTime( 6, TimeUnit.SECONDS );
insert( new StockTick( "ACME" ) );
insert(new StockTick("DROO"));
clock.advanceTime(6, TimeUnit.SECONDS);
insert(new StockTick("ACME"));

//-- Assume JVM down here. Fail-over to other JVM or rebooted JVM
//-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id"
Expand All @@ -65,40 +66,75 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO
assertThat(getResults()).containsExactlyInAnyOrder("fired");
clearResults();

clock.advanceTime( 1, TimeUnit.SECONDS );
insert( new StockTick( "ACME" ) );
clock.advanceTime(1, TimeUnit.SECONDS);
insert(new StockTick("ACME"));

assertThat(fireAllRules()).isEqualTo(1);
assertThat(getResults()).containsExactlyInAnyOrder("fired");
clearResults();

clock.advanceTime( 3, TimeUnit.SECONDS );
insert( new StockTick( "ACME" ) );
clock.advanceTime(3, TimeUnit.SECONDS);
insert(new StockTick("ACME"));

assertThat(fireAllRules()).isZero();
assertThat(getResults()).isEmpty();
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor"
void insertAdvanceInsertFailoverFireTwice_shouldRecoverFromFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) {

KieSession session1 = createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);

SessionPseudoClock clock = getSessionClock(session1);

insert(session1, new StockTick("DROO"));
clock.advanceTime(6, TimeUnit.SECONDS);
insert(session1, new StockTick("ACME"));

//-- Assume JVM down here. Fail-over to other JVM or rebooted JVM
//-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id"
failover();
session1 = restoreSession(session1.getIdentifier(), CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);
clock = getSessionClock(session1);

assertThat(fireAllRules(session1)).isEqualTo(1);
assertThat(getResults(session1)).containsExactlyInAnyOrder("fired");
clearResults(session1);

clock.advanceTime(3, TimeUnit.SECONDS);
insert(session1, new StockTick("ACME"));

//-- Assume JVM down here. Fail-over to other JVM or rebooted JVM
//-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id"
failover();
session1 = restoreSession(session1.getIdentifier(), CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);
clock = getSessionClock(session1);

assertThat(fireAllRules(session1)).isZero();
assertThat(getResults(session1)).isEmpty();
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) {

createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);
SessionPseudoClock clock = getSessionClock();

insert( new StockTick( "DROO" ) );
clock.advanceTime( 6, TimeUnit.SECONDS );
insert( new StockTick( "ACME" ) );
insert(new StockTick("DROO"));
clock.advanceTime(6, TimeUnit.SECONDS);
insert(new StockTick("ACME"));

failover();
restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);
clock = getSessionClock();

clock.advanceTime(58, TimeUnit.SECONDS);
assertThat(fireAllRules()).as("DROO is expired, but a match is available.")
.isEqualTo(1);
.isEqualTo(1);
assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s")
.hasSize(1);
.hasSize(1);
}

@ParameterizedTest
Expand All @@ -113,7 +149,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO
insert(new StockTick("ACME"));

assertThat(fireAllRules()).as("DROO is expired, but a match is available.")
.isEqualTo(1);
.isEqualTo(1);

failover();
restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);
Expand All @@ -123,6 +159,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO
fireAllRules();

assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s")
.hasSize(1);
.hasSize(1);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@

package org.drools.reliability.infinispan;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.drools.base.facttemplates.Event;
import org.drools.core.ClassObjectFilter;
import org.drools.kiesession.session.StatefulKnowledgeSessionImpl;
import org.drools.model.Model;
import org.drools.model.codegen.ExecutableModelProject;
import org.drools.modelcompiler.KieBaseBuilder;
import org.drools.reliability.core.ReliableGlobalResolver;
import org.drools.reliability.core.ReliableKieSession;
import org.drools.reliability.core.ReliableRuntimeComponentFactoryImpl;
import org.drools.reliability.core.StorageManagerFactory;
Expand All @@ -53,6 +48,14 @@
import org.slf4j.LoggerFactory;
import org.test.domain.Person;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.drools.reliability.infinispan.InfinispanStorageManagerFactory.INFINISPAN_STORAGE_MARSHALLER;
import static org.drools.reliability.infinispan.util.PrototypeUtils.createEvent;
import static org.drools.util.Config.getConfig;
Expand All @@ -68,8 +71,7 @@ public abstract class ReliabilityTestBasics {
private InfinispanContainer container;

protected final List<KieSession> sessions = new ArrayList<>();

private long persistedSessionId = -1;
protected final HashMap<Long,Long> persistedSessionIds = new HashMap<>();

protected PersistedSessionOption.SafepointStrategy safepointStrategy;

Expand Down Expand Up @@ -220,6 +222,7 @@ protected void clearResults() {

protected void clearResults(KieSession session) {
((List<Object>) session.getGlobal("results")).clear();
((ReliableGlobalResolver)((StatefulKnowledgeSessionImpl) session).getGlobalResolver()).updateStorage();
}

protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, Option... options) {
Expand All @@ -239,11 +242,13 @@ protected KieSession restoreSession(String drl, PersistedSessionOption.Persisten
}

protected KieSession restoreSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) {
return restoreSession(persistedSessionId, drl, persistenceStrategy, safepointStrategy, options);
Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0];
return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, options);
}

protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) {
return getKieSession(drl, PersistedSessionOption.fromSession(sessionId).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options);
Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId);
return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options);
}

protected int fireAllRules() {
Expand All @@ -255,7 +260,8 @@ protected int fireAllRules(KieSession session) {
}

protected KieSession restoreSession(Model ruleModel, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) {
return getKieSession(ruleModel, PersistedSessionOption.fromSession(persistedSessionId).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options);
Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0];
return getKieSession(ruleModel, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options);
}

protected void disposeSession() {
Expand Down Expand Up @@ -314,12 +320,12 @@ private KieSession getKieSessionFromKieBase(KieBase kbase, PersistedSessionOptio
}
Stream.of(optionsFilter.getKieSessionOption()).forEach(conf::setOption);
KieSession session = kbase.newKieSession(conf, null);
sessions.add(session);
if (persistedSessionOption == null || persistedSessionOption.isNewSession()) {
List<Object> results = new ArrayList<>();
session.setGlobal("results", results);
persistedSessionId = session.getIdentifier();
}
sessions.add(session);
persistedSessionIds.put(session.getIdentifier(),persistedSessionOption == null || persistedSessionOption.isNewSession() ? session.getIdentifier() : persistedSessionOption.getSessionId());
return session;
}

Expand Down

0 comments on commit cce6933

Please sign in to comment.