diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java index 2eceb3057ec3..48237c2329b5 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.kie.api.conf.EventProcessingOption; +import org.kie.api.runtime.KieSession; import org.kie.api.runtime.conf.ClockTypeOption; import org.kie.api.runtime.conf.PersistedSessionOption; import org.kie.api.time.SessionPseudoClock; @@ -35,13 +36,13 @@ class ReliabilityCepTest extends ReliabilityTestBasics { private static final String CEP_RULE = "import " + StockTick.class.getCanonicalName() + ";" + - "global java.util.List results;" + - "rule R when\n" + - " $a : StockTick( company == \"DROO\" )\n" + - " $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" + - "then\n" + - " results.add(\"fired\");\n" + - "end\n"; + "global java.util.List results;" + + "rule R when\n" + + " $a : StockTick( company == \"DROO\" )\n" + + " $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" + + "then\n" + + " results.add(\"fired\");\n" + + "end\n"; @ParameterizedTest @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor" @@ -51,9 +52,9 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO SessionPseudoClock clock = getSessionClock(); - insert( new StockTick( "DROO" ) ); - clock.advanceTime( 6, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + insert(new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(new StockTick("ACME")); //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" @@ -65,20 +66,55 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO assertThat(getResults()).containsExactlyInAnyOrder("fired"); clearResults(); - clock.advanceTime( 1, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + clock.advanceTime(1, TimeUnit.SECONDS); + insert(new StockTick("ACME")); assertThat(fireAllRules()).isEqualTo(1); assertThat(getResults()).containsExactlyInAnyOrder("fired"); clearResults(); - clock.advanceTime( 3, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + clock.advanceTime(3, TimeUnit.SECONDS); + insert(new StockTick("ACME")); assertThat(fireAllRules()).isZero(); assertThat(getResults()).isEmpty(); } + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor" + void insertAdvanceInsertFailoverFireTwice_shouldRecoverFromFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { + + KieSession session1 = createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + SessionPseudoClock clock = getSessionClock(session1); + + insert(session1, new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(session1, new StockTick("ACME")); + + //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM + //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" + failover(); + session1 = restoreSession(session1.getIdentifier(), CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + clock = getSessionClock(session1); + + assertThat(fireAllRules(session1)).isEqualTo(1); + assertThat(getResults(session1)).containsExactlyInAnyOrder("fired"); + clearResults(session1); + + clock.advanceTime(3, TimeUnit.SECONDS); + insert(session1, new StockTick("ACME")); + + //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM + //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" + failover(); + session1 = restoreSession(session1.getIdentifier(), CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + clock = getSessionClock(session1); + + assertThat(fireAllRules(session1)).isZero(); + assertThat(getResults(session1)).isEmpty(); + } + @ParameterizedTest @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { @@ -86,9 +122,9 @@ void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionO createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); SessionPseudoClock clock = getSessionClock(); - insert( new StockTick( "DROO" ) ); - clock.advanceTime( 6, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + insert(new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(new StockTick("ACME")); failover(); restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); @@ -96,9 +132,9 @@ void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionO clock.advanceTime(58, TimeUnit.SECONDS); assertThat(fireAllRules()).as("DROO is expired, but a match is available.") - .isEqualTo(1); + .isEqualTo(1); assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s") - .hasSize(1); + .hasSize(1); } @ParameterizedTest @@ -113,7 +149,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO insert(new StockTick("ACME")); assertThat(fireAllRules()).as("DROO is expired, but a match is available.") - .isEqualTo(1); + .isEqualTo(1); failover(); restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); @@ -123,6 +159,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO fireAllRules(); assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s") - .hasSize(1); + .hasSize(1); } -} + +} \ No newline at end of file diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java index 8161df6e0a63..757e8bba4339 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java @@ -15,18 +15,13 @@ package org.drools.reliability.infinispan; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - import org.drools.base.facttemplates.Event; import org.drools.core.ClassObjectFilter; +import org.drools.kiesession.session.StatefulKnowledgeSessionImpl; import org.drools.model.Model; import org.drools.model.codegen.ExecutableModelProject; import org.drools.modelcompiler.KieBaseBuilder; +import org.drools.reliability.core.ReliableGlobalResolver; import org.drools.reliability.core.ReliableKieSession; import org.drools.reliability.core.ReliableRuntimeComponentFactoryImpl; import org.drools.reliability.core.StorageManagerFactory; @@ -53,6 +48,14 @@ import org.slf4j.LoggerFactory; import org.test.domain.Person; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + import static org.drools.reliability.infinispan.InfinispanStorageManagerFactory.INFINISPAN_STORAGE_MARSHALLER; import static org.drools.reliability.infinispan.util.PrototypeUtils.createEvent; import static org.drools.util.Config.getConfig; @@ -68,8 +71,7 @@ public abstract class ReliabilityTestBasics { private InfinispanContainer container; protected final List sessions = new ArrayList<>(); - - private long persistedSessionId = -1; + protected final HashMap persistedSessionIds = new HashMap<>(); protected PersistedSessionOption.SafepointStrategy safepointStrategy; @@ -220,6 +222,7 @@ protected void clearResults() { protected void clearResults(KieSession session) { ((List) session.getGlobal("results")).clear(); + ((ReliableGlobalResolver)((StatefulKnowledgeSessionImpl) session).getGlobalResolver()).updateStorage(); } protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, Option... options) { @@ -239,11 +242,13 @@ protected KieSession restoreSession(String drl, PersistedSessionOption.Persisten } protected KieSession restoreSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) { - return restoreSession(persistedSessionId, drl, persistenceStrategy, safepointStrategy, options); + Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0]; + return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, options); } protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) { - return getKieSession(drl, PersistedSessionOption.fromSession(sessionId).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options); + Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); + return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options); } protected int fireAllRules() { @@ -255,7 +260,8 @@ protected int fireAllRules(KieSession session) { } protected KieSession restoreSession(Model ruleModel, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) { - return getKieSession(ruleModel, PersistedSessionOption.fromSession(persistedSessionId).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options); + Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0]; + return getKieSession(ruleModel, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options); } protected void disposeSession() { @@ -314,12 +320,12 @@ private KieSession getKieSessionFromKieBase(KieBase kbase, PersistedSessionOptio } Stream.of(optionsFilter.getKieSessionOption()).forEach(conf::setOption); KieSession session = kbase.newKieSession(conf, null); - sessions.add(session); if (persistedSessionOption == null || persistedSessionOption.isNewSession()) { List results = new ArrayList<>(); session.setGlobal("results", results); - persistedSessionId = session.getIdentifier(); } + sessions.add(session); + persistedSessionIds.put(session.getIdentifier(),persistedSessionOption == null || persistedSessionOption.isNewSession() ? session.getIdentifier() : persistedSessionOption.getSessionId()); return session; }