From a789aed67ecb72f797514d9e1fd242a1de6a29f4 Mon Sep 17 00:00:00 2001 From: nprentza Date: Thu, 29 Jun 2023 15:01:51 +0300 Subject: [PATCH 1/2] updateStorage after clearResults --- .../infinispan/ReliabilityCepTest.java | 81 ++++++++++++++----- .../infinispan/ReliabilityTestBasics.java | 3 + 2 files changed, 62 insertions(+), 22 deletions(-) diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java index 2eceb3057ec..48237c2329b 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.kie.api.conf.EventProcessingOption; +import org.kie.api.runtime.KieSession; import org.kie.api.runtime.conf.ClockTypeOption; import org.kie.api.runtime.conf.PersistedSessionOption; import org.kie.api.time.SessionPseudoClock; @@ -35,13 +36,13 @@ class ReliabilityCepTest extends ReliabilityTestBasics { private static final String CEP_RULE = "import " + StockTick.class.getCanonicalName() + ";" + - "global java.util.List results;" + - "rule R when\n" + - " $a : StockTick( company == \"DROO\" )\n" + - " $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" + - "then\n" + - " results.add(\"fired\");\n" + - "end\n"; + "global java.util.List results;" + + "rule R when\n" + + " $a : StockTick( company == \"DROO\" )\n" + + " $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" + + "then\n" + + " results.add(\"fired\");\n" + + "end\n"; @ParameterizedTest @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor" @@ -51,9 +52,9 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO SessionPseudoClock clock = getSessionClock(); - insert( new StockTick( "DROO" ) ); - clock.advanceTime( 6, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + insert(new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(new StockTick("ACME")); //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" @@ -65,20 +66,55 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO assertThat(getResults()).containsExactlyInAnyOrder("fired"); clearResults(); - clock.advanceTime( 1, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + clock.advanceTime(1, TimeUnit.SECONDS); + insert(new StockTick("ACME")); assertThat(fireAllRules()).isEqualTo(1); assertThat(getResults()).containsExactlyInAnyOrder("fired"); clearResults(); - clock.advanceTime( 3, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + clock.advanceTime(3, TimeUnit.SECONDS); + insert(new StockTick("ACME")); assertThat(fireAllRules()).isZero(); assertThat(getResults()).isEmpty(); } + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor" + void insertAdvanceInsertFailoverFireTwice_shouldRecoverFromFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { + + KieSession session1 = createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + SessionPseudoClock clock = getSessionClock(session1); + + insert(session1, new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(session1, new StockTick("ACME")); + + //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM + //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" + failover(); + session1 = restoreSession(session1.getIdentifier(), CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + clock = getSessionClock(session1); + + assertThat(fireAllRules(session1)).isEqualTo(1); + assertThat(getResults(session1)).containsExactlyInAnyOrder("fired"); + clearResults(session1); + + clock.advanceTime(3, TimeUnit.SECONDS); + insert(session1, new StockTick("ACME")); + + //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM + //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" + failover(); + session1 = restoreSession(session1.getIdentifier(), CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + clock = getSessionClock(session1); + + assertThat(fireAllRules(session1)).isZero(); + assertThat(getResults(session1)).isEmpty(); + } + @ParameterizedTest @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { @@ -86,9 +122,9 @@ void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionO createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); SessionPseudoClock clock = getSessionClock(); - insert( new StockTick( "DROO" ) ); - clock.advanceTime( 6, TimeUnit.SECONDS ); - insert( new StockTick( "ACME" ) ); + insert(new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(new StockTick("ACME")); failover(); restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); @@ -96,9 +132,9 @@ void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionO clock.advanceTime(58, TimeUnit.SECONDS); assertThat(fireAllRules()).as("DROO is expired, but a match is available.") - .isEqualTo(1); + .isEqualTo(1); assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s") - .hasSize(1); + .hasSize(1); } @ParameterizedTest @@ -113,7 +149,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO insert(new StockTick("ACME")); assertThat(fireAllRules()).as("DROO is expired, but a match is available.") - .isEqualTo(1); + .isEqualTo(1); failover(); restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); @@ -123,6 +159,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO fireAllRules(); assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s") - .hasSize(1); + .hasSize(1); } -} + +} \ No newline at end of file diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java index 3612104b4e1..757e8bba433 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityTestBasics.java @@ -17,9 +17,11 @@ import org.drools.base.facttemplates.Event; import org.drools.core.ClassObjectFilter; +import org.drools.kiesession.session.StatefulKnowledgeSessionImpl; import org.drools.model.Model; import org.drools.model.codegen.ExecutableModelProject; import org.drools.modelcompiler.KieBaseBuilder; +import org.drools.reliability.core.ReliableGlobalResolver; import org.drools.reliability.core.ReliableKieSession; import org.drools.reliability.core.ReliableRuntimeComponentFactoryImpl; import org.drools.reliability.core.StorageManagerFactory; @@ -220,6 +222,7 @@ protected void clearResults() { protected void clearResults(KieSession session) { ((List) session.getGlobal("results")).clear(); + ((ReliableGlobalResolver)((StatefulKnowledgeSessionImpl) session).getGlobalResolver()).updateStorage(); } protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, Option... options) { From e12c1408f9895c99f56599513135c32abcbea08a Mon Sep 17 00:00:00 2001 From: nprentza Date: Mon, 3 Jul 2023 11:54:30 +0300 Subject: [PATCH 2/2] rebase --- .../infinispan/ReliabilityCepTest.java | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java index 48237c2329b..8851527e2ca 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityCepTest.java @@ -36,13 +36,13 @@ class ReliabilityCepTest extends ReliabilityTestBasics { private static final String CEP_RULE = "import " + StockTick.class.getCanonicalName() + ";" + - "global java.util.List results;" + - "rule R when\n" + - " $a : StockTick( company == \"DROO\" )\n" + - " $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" + - "then\n" + - " results.add(\"fired\");\n" + - "end\n"; + "global java.util.List results;" + + "rule R when\n" + + " $a : StockTick( company == \"DROO\" )\n" + + " $b : StockTick( company == \"ACME\", this after[5s,8s] $a )\n" + + "then\n" + + " results.add(\"fired\");\n" + + "end\n"; @ParameterizedTest @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") // FULL fails with "ReliablePropagationList; no valid constructor" @@ -52,9 +52,9 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO SessionPseudoClock clock = getSessionClock(); - insert(new StockTick("DROO")); - clock.advanceTime(6, TimeUnit.SECONDS); - insert(new StockTick("ACME")); + insert( new StockTick( "DROO" ) ); + clock.advanceTime( 6, TimeUnit.SECONDS ); + insert( new StockTick( "ACME" ) ); //-- Assume JVM down here. Fail-over to other JVM or rebooted JVM //-- ksession and kbase are lost. CacheManager is recreated. Client knows only "id" @@ -66,15 +66,15 @@ void insertAdvanceInsertFailoverFire_shouldRecoverFromFailover(PersistedSessionO assertThat(getResults()).containsExactlyInAnyOrder("fired"); clearResults(); - clock.advanceTime(1, TimeUnit.SECONDS); - insert(new StockTick("ACME")); + clock.advanceTime( 1, TimeUnit.SECONDS ); + insert( new StockTick( "ACME" ) ); assertThat(fireAllRules()).isEqualTo(1); assertThat(getResults()).containsExactlyInAnyOrder("fired"); clearResults(); - clock.advanceTime(3, TimeUnit.SECONDS); - insert(new StockTick("ACME")); + clock.advanceTime( 3, TimeUnit.SECONDS ); + insert( new StockTick( "ACME" ) ); assertThat(fireAllRules()).isZero(); assertThat(getResults()).isEmpty(); @@ -122,9 +122,9 @@ void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionO createSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); SessionPseudoClock clock = getSessionClock(); - insert(new StockTick("DROO")); - clock.advanceTime(6, TimeUnit.SECONDS); - insert(new StockTick("ACME")); + insert( new StockTick( "DROO" ) ); + clock.advanceTime( 6, TimeUnit.SECONDS ); + insert( new StockTick( "ACME" ) ); failover(); restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); @@ -132,9 +132,9 @@ void insertAdvanceFailoverExpireFire_shouldExpireAfterFailover(PersistedSessionO clock.advanceTime(58, TimeUnit.SECONDS); assertThat(fireAllRules()).as("DROO is expired, but a match is available.") - .isEqualTo(1); + .isEqualTo(1); assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s") - .hasSize(1); + .hasSize(1); } @ParameterizedTest @@ -149,7 +149,7 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO insert(new StockTick("ACME")); assertThat(fireAllRules()).as("DROO is expired, but a match is available.") - .isEqualTo(1); + .isEqualTo(1); failover(); restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); @@ -159,7 +159,6 @@ void insertAdvanceFireFailoverExpire_shouldExpireAfterFailover(PersistedSessionO fireAllRules(); assertThat(getFactHandles()).as("DROO should have expired because @Expires = 60s") - .hasSize(1); + .hasSize(1); } - -} \ No newline at end of file +}